From a2e938c73737e6add518975d4ff4ba84122fb19f Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Sat, 10 Feb 2018 23:42:23 +0100 Subject: [PATCH 1/4] Add an MPI-3 (window) based IO --- src/MPI.jl | 1 + src/window-io.jl | 294 ++++++++++++++++++++++++++++++++++++++++++ test/test_windowio.jl | 176 +++++++++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 src/window-io.jl create mode 100644 test/test_windowio.jl diff --git a/src/MPI.jl b/src/MPI.jl index fa78b517e..d3d1abe80 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -19,6 +19,7 @@ include(depfile) include("mpi-base.jl") include("cman.jl") +include("window-io.jl") const mpitype_dict = Dict{DataType, Cint}() const mpitype_dict_inverse = Dict{Cint, DataType}() diff --git a/src/window-io.jl b/src/window-io.jl new file mode 100644 index 000000000..34aed0226 --- /dev/null +++ b/src/window-io.jl @@ -0,0 +1,294 @@ +export WindowIO, WindowWriter + +# Type of the number of available entries +const WinCountT = Cint + +mutable struct BufferHeader + count::WinCountT # Number of elements in the buffer + address::Cptrdiff_t + length::WinCountT # Current size of the buffer + needed_length::WinCountT # Size the buffer should have to handle all pending writes +end + +""" + + WindowIO(target::Integer, comm=MPI.COMM_WORLD, bufsize=1024^2) + +Expose an MPI RMA window using the IO interface. Must be constructed on all ranks in the communicator. +The target is the rank to which data is sent when calling write, comm is the communicator to use and +bufsize is the initial size of the buffer. A target may be written to by multiple ranks concurrently, +and the receive buffer will be grown as needed, but never shrinks. +Communication happens only when flush is called. +""" +mutable struct WindowIO <: IO + comm::MPI.Comm + myrank::Int + # Represents the received data. First elements contain a counter with the total number of entries in the buffer + buffer::Array{UInt8,1} + win::Win + header::BufferHeader + remote_header::BufferHeader + header_win::Win + header_cwin::CWin + is_open::Bool + # Current read position + ptr::WinCountT + data_available::Condition + read_requested::Condition + lock::ReentrantLock # Needed for Base + waiter + + function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2) + buffer = Array{UInt8,1}(bufsize) + header_win = MPI.Win() + header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) + header_arr = unsafe_wrap(Vector{UInt8}, Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader)) + MPI.Win_create(header_arr, MPI.INFO_NULL, comm, header_win) + win = MPI.Win() + MPI.Win_create_dynamic(MPI.INFO_NULL, comm, win) + MPI.Win_attach(win, buffer) + + w = new(comm, + MPI.Comm_rank(comm), + buffer, + win, + header, + remote_header, + header_win, + CWin(header_win), + true, + 0, + Condition(), + Condition(), + ReentrantLock(), + nothing) + + w.waiter = Task(function() + wait(w.read_requested) + while w.is_open + while !has_data_available(w) && w.is_open + yield() + end + if w.is_open + notify(w.data_available) + wait(w.read_requested) + end + end + end) + + yield(w.waiter) + + return w + end +end + + +Base.nb_available(w::WindowIO)::WinCountT = w.header.count - w.ptr + +# Checks if data is available and grows the buffer if needed by the writing side +function has_data_available(w::WindowIO) + if !w.is_open + return false + end + + if w.header.count > w.ptr && w.header.needed_length == w.header.length # fast check without window sync + return true + end + + # Check if we need to grow the buffer + MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate + if w.header.needed_length > w.header.length + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + MPI.Win_detach(w.win, w.buffer) + resize!(w.buffer, w.header.needed_length) + MPI.Win_attach(w.win, w.buffer) + w.header.address = MPI.Get_address(w.buffer) + w.header.length = w.header.needed_length + MPI.Win_unlock(w.myrank, w.header_win) + end + + return w.header.count > w.ptr +end + +function Base.wait(w::WindowIO) + notify(w.read_requested) + wait(w.data_available) +end + +# Waits for data and returns the number of available bytes +function wait_nb_available(w) + if !has_data_available(w) + wait(w) + end + return nb_available(w) +end + +# wait until the specified number of bytes is available or the stream is closed +function wait_nb_available(w, nb) + nb_found = wait_nb_available(w) + while nb_found < nb && w.is_open + MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates + nb_found = wait_nb_available(w) + end + return nb_found +end + +mutable struct WindowWriter <: IO + winio::WindowIO + target::Int + # Writes are buffered to only lock and communicate upon flush + write_buffer::Vector{UInt8} + lock::ReentrantLock + nb_written::Int + + function WindowWriter(w::WindowIO, target::Integer) + return new(w, target, Vector{UInt8}(1024^2), ReentrantLock(), 0) + end +end + +@inline Base.isopen(w::WindowIO)::Bool = w.is_open +@inline Base.isopen(s::WindowWriter)::Bool = s.winio.is_open + +function Base.eof(w::WindowIO) + if !isopen(w) + return true + else + wait_nb_available(w) + end + return !isopen(w) +end + +Base.iswritable(::WindowIO) = false +Base.isreadable(::WindowIO) = true +Base.iswritable(::WindowWriter) = true +Base.isreadable(::WindowWriter) = false + +function Base.close(w::WindowIO) + w.is_open = false + notify(w.read_requested) + wait(w.waiter) # Wait for the data notification loop to finish + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + w.header.count = 0 + w.ptr = 0 + MPI.Win_unlock(w.myrank, w.header_win) + MPI.Barrier(w.comm) + MPI.Win_free(w.win) + MPI.Win_free(w.header_win) +end +Base.close(s::WindowWriter) = nothing + +# Checks if all available data is read, and if so resets the counter with the number of written bytes to 0 +function complete_read(w::WindowIO) + if w.header.count != 0 && w.header.count == w.ptr + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) + if w.header.count != 0 && w.header.count == w.ptr # Check again after locking + w.header.count = 0 + w.ptr = 0 + end + MPI.Win_unlock(w.myrank, w.header_win) + end +end + +function Base.read(w::WindowIO, ::Type{UInt8}) + if wait_nb_available(w) < 1 + throw(EOFError()) + end + + w.ptr += 1 + result = w.buffer[w.ptr] + complete_read(w) + return result +end + +function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); all::Bool=true) + nb_obtained = nb_available(w) + if all + nb_obtained = wait_nb_available(w,nb) + if nb_obtained < nb + throw(EOFError()) + end + resize!(b, nb) + end + nb_read = min(nb_obtained, nb) + if nb_read == 0 + return 0 + end + copy!(b, 1, w.buffer, w.ptr+1, nb_read) + w.ptr += nb_read + complete_read(w) + return nb_read +end + +Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) + +@inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt) + nb_obtained = wait_nb_available(w,nb) + nb_read = min(nb_obtained, nb) + unsafe_copy!(p, pointer(w.buffer, w.ptr+1), nb_read) + w.ptr += nb_read + complete_read(w) + if nb_read != nb + throw(EOFError()) + end + return +end + +function Base.read(w::WindowIO, nb::Integer; all::Bool=true) + buf = Vector{UInt8}(nb) + readbytes!(w, buf, nb, all=all) + return buf +end + +function ensureroom(w::WindowWriter) + if w.nb_written > length(w.write_buffer) + resize!(w.write_buffer, w.nb_written) + end +end + +function Base.write(w::WindowWriter, b::UInt8) + w.nb_written += 1 + ensureroom(w) + w.write_buffer[w.nb_written] = b + return sizeof(UInt8) +end +function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt) + offset = w.nb_written+1 + w.nb_written += nb + ensureroom(w) + copy!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) + return nb +end + +Base.flush(::WindowIO) = error("WindowIO is read-only, did you mean to flush an associated WindowWriter?") + +function Base.flush(s::WindowWriter) + if !isopen(s) + throw(EOFError()) + end + nb_to_write = s.nb_written + free = 0 + header = s.winio.remote_header + header_win = s.winio.header_win + while free < nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, header_win) + MPI.Get(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + MPI.Win_flush(s.target, header_win) + free = header.length - header.count + if free >= nb_to_write + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, s.target, 0, s.winio.win) + MPI.Put(pointer(s.write_buffer), nb_to_write, s.target, header.address + header.count, s.winio.win) + MPI.Win_unlock(s.target, s.winio.win) + MPI.Put(Ref{WinCountT}(header.count + nb_to_write), s.target, header_win) + s.nb_written = 0 + else + # Request to grow buffer, if not done already + new_needed_length = max(header.needed_length, header.count + nb_to_write) + if (new_needed_length > header.needed_length) + header.needed_length = new_needed_length + MPI.Put(Ptr{UInt8}(pointer_from_objref(header)), sizeof(BufferHeader), s.target, 0, header_win) + end + end + MPI.Win_unlock(s.target, header_win) + end +end diff --git a/test/test_windowio.jl b/test/test_windowio.jl new file mode 100644 index 000000000..5700c3c7a --- /dev/null +++ b/test/test_windowio.jl @@ -0,0 +1,176 @@ +using Base.Test +using MPI + +MPI.Init() + +comm = MPI.COMM_WORLD + +const rank = MPI.Comm_rank(comm) +const N = MPI.Comm_size(comm) + +if N < 2 + error("This test needs at least 2 processes") + exit(1) +end + +const winio = WindowIO(comm) +const writer = WindowWriter(winio, 0) + +# directly test the different read functions +if rank == 1 + write(writer, UInt8(5)) + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 1 + @test read(winio, UInt8) == UInt8(5) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(5) + readbytes!(winio, arr) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + @test MPI.wait_nb_available(winio) == 5 + arr = Vector{UInt8}(5) + unsafe_read(winio, pointer(arr), 2) + @test nb_available(winio) == 3 + unsafe_read(winio, pointer(arr,3), 3) + @test String(arr) == "hello" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + arr = Vector{UInt8}(3) + readbytes!(winio, arr) # waits for data + @test nb_available(winio) == 2 + @test String(arr) == "hel" + fill!(arr, UInt8('a')) + readbytes!(winio, arr, all=false) # reads what's available + @test String(arr) == "loa" + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +if rank == 1 + write(writer, "hello") + flush(writer) # Must flush to trigger communication +elseif rank == 0 + result = "hello" + for i in 1:5 + @test read(winio, Char) == result[i] + @test nb_available(winio) == 5 - i + end +end + +MPI.Barrier(comm) + +if rank != 0 + write(writer, 1, 2.0, Cint(3)) + flush(writer) # Must flush to trigger communication +else + expected_nb_recv = sizeof(1) + sizeof(2.0) + sizeof(Cint(3)) + result = read(winio, (N-1)*expected_nb_recv) + @test nb_available(winio) == 0 + @test length(result) == (N-1)*expected_nb_recv +end + +MPI.Barrier(comm) + +# Test blocking read +if rank != 0 + write(writer, rank) + flush(writer) # Must flush to trigger communication +else + result = read(winio, (N-1)*sizeof(Int)) # Blocks until all required data is read + @test sort(reinterpret(Int,result)) == collect(1:N-1) + @test nb_available(winio) == 0 +end + +MPI.Barrier(comm) + +# Timing +if rank == 1 + const arr = ones(UInt8,100000) + println("write and flush timings:") + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + @time write(writer, arr) + @time flush(writer) + MPI.Barrier(comm) +elseif rank == 0 + const recarr = Vector{UInt8}(100000) + MPI.Barrier(comm) + + println("read timings:") + @time read!(winio, recarr) + @time read!(winio, recarr) + @time read!(winio, recarr) + @test recarr == ones(UInt8,100000) +else + MPI.Barrier(comm) +end + +MPI.Barrier(comm) + +# Test readline +message = "Hi from rank " +if rank != 0 + println(writer, message*string(rank)) + flush(writer) # Must flush to trigger communication +else + nb_received = 0 + while nb_received != N-1 + line = readline(winio) + @test startswith(line, message) + @test line[length(message)+1:end] ∈ string.(1:N) + nb_received += 1 + end +end + +MPI.Barrier(comm) +close(winio) +MPI.Barrier(comm) + +# Send more than the buffer can contain +BS = 24 +const winio2 = WindowIO(comm, BS+sizeof(MPI.WinCountT)) +const writer2 = WindowWriter(winio2, 0) +if rank != 0 + write(writer2, rank*ones(BS)) + flush(writer2) # Must flush to trigger communication +else + result = Vector{Float64}(BS*(N-1)) + read!(winio2, result) + header = winio2.header + @test nb_available(winio2) == 0 + @test header.count == 0 + @test header.length == header.needed_length + @test header.length > BS+sizeof(MPI.WinCountT) + @test sum(result) == sum((1:N-1)*BS) +end + +close(winio2) +MPI.Finalize() From 27414280f871ba9dea5bf9095c1d473943dacad9 Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Sat, 10 Feb 2018 23:43:15 +0100 Subject: [PATCH 2/4] Add a ClusterManager using one-sided communication --- README.md | 9 +- src/MPI.jl | 1 + src/cman.jl | 45 +++++--- src/window-cman.jl | 185 +++++++++++++++++++++++++++++++++ src/window-io.jl | 6 +- test/test_reduce.jl | 31 ++++++ test/test_windowcman.jl | 153 +++++++++++++++++++++++++++ test/test_windowcman_nowait.jl | 64 ++++++++++++ 8 files changed, 473 insertions(+), 21 deletions(-) create mode 100644 src/window-cman.jl create mode 100644 test/test_windowcman.jl create mode 100644 test/test_windowcman_nowait.jl diff --git a/README.md b/README.md index 95d7d55c0..cadd877bf 100644 --- a/README.md +++ b/README.md @@ -166,14 +166,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on. -The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`. +The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`. On mpi rank 0, it returns a `manager` which can be used with `@mpi_do` On other processes (i.e., the workers) the function does not return ### MPIManager ### (MPI transport - all processes execute MPI code) -`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport. +`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport. `mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport. ## Julia MPI-only interface @@ -189,6 +189,11 @@ juliacomm = MPI.COMM_WORLD ccomm = MPI.CComm(juliacomm) ``` +### MPIWindowIOManager +This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO +based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster` +macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples. + ### Currently wrapped MPI functions Convention: `MPI_Fun => MPI.Fun` diff --git a/src/MPI.jl b/src/MPI.jl index d3d1abe80..bbeb8963b 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -20,6 +20,7 @@ include(depfile) include("mpi-base.jl") include("cman.jl") include("window-io.jl") +include("window-cman.jl") const mpitype_dict = Dict{DataType, Cint}() const mpitype_dict_inverse = Dict{Cint, DataType}() diff --git a/src/cman.jl b/src/cman.jl index 2111bf0b4..2f53ec68d 100644 --- a/src/cman.jl +++ b/src/cman.jl @@ -1,31 +1,37 @@ import Base: kill export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do -export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL +export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT using Compat using Compat.Distributed import Compat.Sockets: connect, listenany, accept, IPv4, getsockname -################################################################################ -# MPI Cluster Manager -# Note: The cluster manager object lives only in the manager process, -# except for MPI_TRANSPORT_ALL +""" +MPI Cluster Manager +Note: The cluster manager object lives only in the manager process, +except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO + +There are four different transport modes: -# There are three different transport modes: +MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This +allows interactive use from a Julia shell, using the familiar `addprocs` +interface. -# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This -# allows interactive use from a Julia shell, using the familiar `addprocs` -# interface. +MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager +process. This corresponds to the "usual" way in which MPI is used in a +headless mode, e.g. submitted as a script to a queueing system. -# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager -# process. This corresponds to the "usual" way in which MPI is used in a -# headless mode, e.g. submitted as a script to a queueing system. +TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its +communication between processes. MPI can still be used by the user. -# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its -# communication between processes. MPI can still be used by the user. +MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes. +The program must be started with mpirun or equivalent. -@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL +MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster +macro. All other code runs as regular MPI code (single program, multiple data). +""" +@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT mutable struct MPIManager <: ClusterManager np::Int # number of worker processes (excluding the manager process) @@ -319,8 +325,9 @@ end ################################################################################ # Alternative startup model: All Julia processes are started via an external # mpirun, and the user does not call addprocs. - -# Enter the MPI cluster manager's main loop (does not return on the workers) +""" +Enter the MPI cluster manager's main loop (does not return on the workers) +""" function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; comm::MPI.Comm=MPI.COMM_WORLD) !MPI.Initialized() && MPI.Init() @@ -392,6 +399,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; MPI.Finalize() exit() end + elseif mode == MPI_WINDOW_IO + start_window_worker(comm, true) + elseif mode == MPI_WINDOW_NOWAIT + start_window_worker(comm, false) else error("Unknown mode $mode") end diff --git a/src/window-cman.jl b/src/window-cman.jl new file mode 100644 index 000000000..808c7915e --- /dev/null +++ b/src/window-cman.jl @@ -0,0 +1,185 @@ +import Base: launch, kill, manage, connect +export MPIWindowIOManager, launch, kill, manage, connect, @cluster + +""" +Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered +""" +mutable struct MPIWindowIOManager <: ClusterManager + comm::MPI.Comm + connection_windows::Vector{WindowIO} + stdio_windows::Vector{WindowIO} + workers_wait::Bool + + function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool) + nb_procs = MPI.Comm_size(comm) + connection_windows = Vector{WindowIO}(nb_procs) + stdio_windows = Vector{WindowIO}(nb_procs) + + for i in 1:nb_procs + connection_windows[i] = WindowIO(comm) + stdio_windows[i] = WindowIO(comm) + end + + # Make sure all windows are created before continuing + MPI.Barrier(comm) + + return new(comm, connection_windows, stdio_windows, workers_wait) + end +end + +# Closes all local MPI Windows in a manager. Must be called collectively on all ranks +function closeall(manager::MPIWindowIOManager) + for w in manager.connection_windows + close(w) + end + for w in manager.stdio_windows + close(w) + end +end + +function launch(mgr::MPIWindowIOManager, params::Dict, + instances::Array, cond::Condition) + try + nprocs = MPI.Comm_size(mgr.comm) + for cnt in 1:(nprocs-1) + push!(instances, WorkerConfig()) + end + notify(cond) + catch e + println("Error in MPI launch $e") + rethrow(e) + end +end + +function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) + @spawnat pid notify(_stop_requested) + Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED) +end + +function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end + +function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) + myrank = MPI.Comm_rank(mgr.comm) + if myrank == 0 + proc_stdio = mgr.stdio_windows[pid] + @schedule while !eof(proc_stdio) + try + println("\tFrom worker $(pid):\t$(readline(proc_stdio))") + catch e + end + end + end + return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1)) +end + +function redirect_to_mpi(s::WindowWriter) + (rd, wr) = redirect_stdout() + @schedule while !eof(rd) && isopen(s.winio) + av = readline(rd) + if isopen(s.winio) + println(s,av) + flush(s) + end + end +end + +function checkworkers() + for w in workers() + if w != (@fetchfrom w myid()) + error("worker $w is not waiting") + end + end +end + +function notify_workers() + for w in workers() + @spawnat(w, notify(_stop_requested)) + end +end + +function wait_for_events() + global _stop_requested + wait(_stop_requested) +end + +""" +Initialize the current process as a Julia parallel worker. Must be called on all ranks. +If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used. +""" +function start_window_worker(comm::Comm, workers_wait) + rank = MPI.Comm_rank(comm) + N = MPI.Comm_size(comm) + + manager = MPIWindowIOManager(comm, workers_wait) + cookie = string(comm) + if length(cookie) > Base.Distributed.HDR_COOKIE_LEN + cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN] + end + + try + if rank == 0 + Base.cluster_cookie(cookie) + MPI.Barrier(comm) + addprocs(manager) + @assert nprocs() == N + @assert nworkers() == (N == 1 ? 1 : N-1) + + if !workers_wait + checkworkers() + notify_workers() + end + else + init_worker(cookie, manager) + MPI.Barrier(comm) + redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0)) + for i in vcat([1], (rank+2):N) + # Receiving end of connections to all higher workers and master + Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) + end + + global _stop_requested = Condition() + wait_for_events() + end + catch e + Base.display_error(STDERR,"exception $e on rank $rank",backtrace()) + end + + if workers_wait && rank != 0 + closeall(manager) + MPI.Finalize() + exit(0) + end + + return manager +end + +""" +Stop the manager. This closes all windows and calls MPI.Finalize on all workers +""" +function stop_main_loop(manager::MPIWindowIOManager) + if myid() != 1 + wait_for_events() + else + checkworkers() + if nprocs() > 1 + rmprocs(workers()) + end + end + closeall(manager) + MPI.Finalize() +end + +""" +Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT, +since this will temporarily activate the worker event loops to listen for messages. +""" +macro cluster(expr) + quote + if myid() != 1 + wait_for_events() + else + $(esc(expr)) + notify_workers() + end + end +end \ No newline at end of file diff --git a/src/window-io.jl b/src/window-io.jl index 34aed0226..d40e61e14 100644 --- a/src/window-io.jl +++ b/src/window-io.jl @@ -97,16 +97,16 @@ function has_data_available(w::WindowIO) end # Check if we need to grow the buffer + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate if w.header.needed_length > w.header.length - MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) MPI.Win_detach(w.win, w.buffer) resize!(w.buffer, w.header.needed_length) MPI.Win_attach(w.win, w.buffer) w.header.address = MPI.Get_address(w.buffer) w.header.length = w.header.needed_length - MPI.Win_unlock(w.myrank, w.header_win) end + MPI.Win_unlock(w.myrank, w.header_win) return w.header.count > w.ptr end @@ -128,7 +128,9 @@ end function wait_nb_available(w, nb) nb_found = wait_nb_available(w) while nb_found < nb && w.is_open + MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates + MPI.Win_unlock(w.myrank, w.header_win) nb_found = wait_nb_available(w) end return nb_found diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 46432d788..b90258da5 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -27,4 +27,35 @@ sum_mesg = MPI.Reduce(mesg, MPI.SUM, root, comm) sum_mesg = rank == root ? sum_mesg : size*mesg @test isapprox(norm(sum_mesg-size*mesg), 0.0) +# For comparison with the clustermanager version +const ARRSIZE = 1024^2*100 +@test ARRSIZE % size == 0 +const my_arr = fill(1*(rank+1),ARRSIZE ÷ size) + +function mpi_sum(arr)::Int + mysum = 0 + for x in arr + mysum += x + end + totalsum = MPI.Reduce(mysum, +, 0, comm) + return rank == 0 ? totalsum[1] : 0 +end + +const sumresult = mpi_sum(my_arr) +const expected = sum((ARRSIZE ÷ size) * (1:size)) +if rank == 0 + @test sumresult == expected +end +if rank == 0 + println("Timings for MPI reduce:") + @time expected == mpi_sum(my_arr) + @time expected == mpi_sum(my_arr) + @time expected == mpi_sum(my_arr) +else + mpi_sum(my_arr) + mpi_sum(my_arr) + mpi_sum(my_arr) +end + + MPI.Finalize() diff --git a/test/test_windowcman.jl b/test/test_windowcman.jl new file mode 100644 index 000000000..62a0e4416 --- /dev/null +++ b/test/test_windowcman.jl @@ -0,0 +1,153 @@ +const using_mpi = (nprocs() == 1) + +if using_mpi + using MPI + using Base.Test + transport_mode = MPI.MPI_WINDOW_IO # This test can run with MPPI_TRANSPORT_ALL and MPI_WINDOW_IO + mgr = MPI.start_main_loop(transport_mode) + @everywhere const comm = MPI.COMM_WORLD + rank = MPI.Comm_rank(comm) + size = MPI.Comm_size(comm) +else + @everywhere using Base.Test +end + +@everywhere const N = nprocs() + +if using_mpi + @test N == size +end + +@test procs() == collect(1:N) +if N > 1 + @test workers() == collect(2:N) +end + +results = Vector{Any}(N) +for i in 1:N + results[i] = remotecall(myid, i) +end + +for i in 1:N + @test fetch(results[i]) == i +end + +@everywhere function test_nprocs() + @test N == nprocs() +end +for i in 1:N + remotecall_wait(test_nprocs, i) +end + +results = zeros(Int, N) +for i in 1:N + results[i] = fetch(@spawnat i myid() + 1) +end +for i in 1:N + @test results[i] == i+1 +end + +# Check if DistributedArrays is available +has_arrays = true +try + @everywhere using DistributedArrays +catch e + has_arrays = false +end + +if has_arrays + @everywhere begin + const ARRSIZE = 1024^2*100 + @test ARRSIZE % N == 0 + + function myrange() + my_nb_indices = ARRSIZE ÷ N + range_start = ((myid()-1)*my_nb_indices+1) + return range_start:range_start+my_nb_indices-1 + end + end + + const d_arr = dfill(1,(ARRSIZE,), procs(), (N,)) + for i in 1:N + @test fetch(@spawnat i localindexes(d_arr))[1] == remotecall_fetch(myrange, i) + end + + println("Normal array sum timings:") + const arr = fill(1,ARRSIZE) + @test sum(arr) == ARRSIZE + @time sum(arr) + @time sum(arr) + @time sum(arr) + + println("DArray sum timings:") + @test sum(d_arr) == ARRSIZE + @time sum(d_arr) + @time sum(d_arr) + @time sum(d_arr) + + @everywhere const my_arr = fill(1*myid(),ARRSIZE ÷ N) + + @everywhere function do_sum(result) + put!(result, sum(my_arr)) + return + end + + function collect_sum() + result = sum(my_arr) + result_chan = RemoteChannel(()->Channel{Int}(N-1)) + for p in workers() + remote_do(do_sum, p, result_chan) + end + nb_collected = 0 + while nb_collected != N-1 + result += take!(result_chan) + nb_collected += 1 + end + return result + end + + println("Split array sum timings:") + @test collect_sum() == sum((ARRSIZE ÷ N) * (1:N)) + @time collect_sum() + @time collect_sum() + @time collect_sum() + + if using_mpi + @everywhere function mpi_sum() + global my_arr + my_sum = [0] + for x in my_arr + my_sum[1] += x + end + return MPI.Reduce(my_sum, MPI.SUM, 0, comm) + end + + function do_mpi_sum() + for w in workers() + @spawnat w mpi_sum() + end + return mpi_sum()[1] + end + + println("MPI_Reduce sum timings:") + @test do_mpi_sum() == sum((ARRSIZE ÷ N) * (1:N)) + @time do_mpi_sum() + @time do_mpi_sum() + @time do_mpi_sum() + end +end + +const the_channel = RemoteChannel() +for p in workers() + @spawnat p put!(the_channel, myid()) +end + +chan_results = [] +for i in 2:N + push!(chan_results, take!(the_channel)) +end +@test sort(chan_results) == workers() + +if using_mpi + MPI.stop_main_loop(mgr) +end \ No newline at end of file diff --git a/test/test_windowcman_nowait.jl b/test/test_windowcman_nowait.jl new file mode 100644 index 000000000..344b5a464 --- /dev/null +++ b/test/test_windowcman_nowait.jl @@ -0,0 +1,64 @@ +using MPI +using Base.Test + +mgr = MPI.start_main_loop(MPI_WINDOW_NOWAIT) + +comm = mgr.comm +rank = MPI.Comm_rank(comm) +size = MPI.Comm_size(comm) +@test rank == myid()-1 +@test size == nprocs() + +const ARRSIZE = 1024^2*100 +@test ARRSIZE % size == 0 +const my_arr = fill(1*(rank+1),ARRSIZE ÷ size) + +function mpi_sum(arr)::Int + mysum = 0 + for x in arr + mysum += x + end + totalsum = MPI.Reduce(mysum, +, 0, comm) + return rank == 0 ? totalsum[1] : 0 +end + +const sumresult = mpi_sum(my_arr) +const expected = sum((ARRSIZE ÷ size) * (1:size)) +if rank == 0 + @test sumresult == expected +end +if rank == 0 + println("Timings for MPI_WINDOW_NOWAIT reduce:") + for i in 1:5 + @time mpi_sum(my_arr) + end +else + for i in 1:5 + mpi_sum(my_arr) + end +end + +# Do some Julia cluster stuff +@cluster begin + @everywhere println("test") + + for w in workers() + @test @fetchfrom w myid() == w + end +end + +sleep(1) + +# Verify that the reduce still works +if rank == 0 + println("Timing after cluster communication:") + for i in 1:5 + @time mpi_sum(my_arr) + end +else + for i in 1:5 + mpi_sum(my_arr) + end +end + +MPI.stop_main_loop(mgr) From aed7b031ede24bcbdb6d15283c040a4738dcd63f Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Mon, 26 Feb 2018 08:05:48 +0100 Subject: [PATCH 3/4] Simplify wait_nb_available in WindowIO --- src/window-io.jl | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/window-io.jl b/src/window-io.jl index d40e61e14..930e5af59 100644 --- a/src/window-io.jl +++ b/src/window-io.jl @@ -29,7 +29,6 @@ mutable struct WindowIO <: IO header::BufferHeader remote_header::BufferHeader header_win::Win - header_cwin::CWin is_open::Bool # Current read position ptr::WinCountT @@ -56,7 +55,6 @@ mutable struct WindowIO <: IO header, remote_header, header_win, - CWin(header_win), true, 0, Condition(), @@ -92,23 +90,23 @@ function has_data_available(w::WindowIO) return false end - if w.header.count > w.ptr && w.header.needed_length == w.header.length # fast check without window sync - return true - end - - # Check if we need to grow the buffer - MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) - MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate - if w.header.needed_length > w.header.length + MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) + have_data = w.header.count > w.ptr + need_grow = w.header.needed_length > w.header.length + MPI.Win_unlock(w.myrank, w.header_win) + + # Grow buffer if needed + if need_grow MPI.Win_detach(w.win, w.buffer) resize!(w.buffer, w.header.needed_length) MPI.Win_attach(w.win, w.buffer) + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) w.header.address = MPI.Get_address(w.buffer) w.header.length = w.header.needed_length + MPI.Win_unlock(w.myrank, w.header_win) end - MPI.Win_unlock(w.myrank, w.header_win) - return w.header.count > w.ptr + return have_data end function Base.wait(w::WindowIO) @@ -126,11 +124,8 @@ end # wait until the specified number of bytes is available or the stream is closed function wait_nb_available(w, nb) - nb_found = wait_nb_available(w) + nb_found = 0 while nb_found < nb && w.is_open - MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) - MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates - MPI.Win_unlock(w.myrank, w.header_win) nb_found = wait_nb_available(w) end return nb_found From 5cb223f7da7e578d6b43cbd2a120406eae5036ef Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Sun, 5 Aug 2018 10:32:10 +0100 Subject: [PATCH 4/4] Update for Julia v0.7 --- src/window-cman.jl | 22 ++++++++++++---------- src/window-io.jl | 16 ++++++++-------- test/test_windowcman.jl | 10 ++++++---- test/test_windowcman_nowait.jl | 3 ++- test/test_windowio.jl | 18 +++++++++--------- 5 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/window-cman.jl b/src/window-cman.jl index 808c7915e..6a4f8e29d 100644 --- a/src/window-cman.jl +++ b/src/window-cman.jl @@ -1,4 +1,6 @@ -import Base: launch, kill, manage, connect +import Base: kill +using Distributed +import Distributed: launch, kill, manage, connect export MPIWindowIOManager, launch, kill, manage, connect, @cluster """ @@ -12,8 +14,8 @@ mutable struct MPIWindowIOManager <: ClusterManager function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool) nb_procs = MPI.Comm_size(comm) - connection_windows = Vector{WindowIO}(nb_procs) - stdio_windows = Vector{WindowIO}(nb_procs) + connection_windows = Vector{WindowIO}(undef,nb_procs) + stdio_windows = Vector{WindowIO}(undef,nb_procs) for i in 1:nb_procs connection_windows[i] = WindowIO(comm) @@ -62,7 +64,7 @@ function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) myrank = MPI.Comm_rank(mgr.comm) if myrank == 0 proc_stdio = mgr.stdio_windows[pid] - @schedule while !eof(proc_stdio) + @async while !eof(proc_stdio) try println("\tFrom worker $(pid):\t$(readline(proc_stdio))") catch e @@ -74,7 +76,7 @@ end function redirect_to_mpi(s::WindowWriter) (rd, wr) = redirect_stdout() - @schedule while !eof(rd) && isopen(s.winio) + @async while !eof(rd) && isopen(s.winio) av = readline(rd) if isopen(s.winio) println(s,av) @@ -112,13 +114,13 @@ function start_window_worker(comm::Comm, workers_wait) manager = MPIWindowIOManager(comm, workers_wait) cookie = string(comm) - if length(cookie) > Base.Distributed.HDR_COOKIE_LEN - cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN] + if length(cookie) > Distributed.HDR_COOKIE_LEN + cookie = cookie[1:Distributed.HDR_COOKIE_LEN] end try if rank == 0 - Base.cluster_cookie(cookie) + Distributed.cluster_cookie(cookie) MPI.Barrier(comm) addprocs(manager) @assert nprocs() == N @@ -134,14 +136,14 @@ function start_window_worker(comm::Comm, workers_wait) redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0)) for i in vcat([1], (rank+2):N) # Receiving end of connections to all higher workers and master - Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) + Distributed.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) end global _stop_requested = Condition() wait_for_events() end catch e - Base.display_error(STDERR,"exception $e on rank $rank",backtrace()) + Base.display_error(stderr,"exception $e on rank $rank",backtrace()) end if workers_wait && rank != 0 diff --git a/src/window-io.jl b/src/window-io.jl index 930e5af59..ecd26b875 100644 --- a/src/window-io.jl +++ b/src/window-io.jl @@ -38,7 +38,7 @@ mutable struct WindowIO <: IO waiter function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2) - buffer = Array{UInt8,1}(bufsize) + buffer = Array{UInt8,1}(undef, bufsize) header_win = MPI.Win() header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) @@ -140,7 +140,7 @@ mutable struct WindowWriter <: IO nb_written::Int function WindowWriter(w::WindowIO, target::Integer) - return new(w, target, Vector{UInt8}(1024^2), ReentrantLock(), 0) + return new(w, target, Vector{UInt8}(undef,1024^2), ReentrantLock(), 0) end end @@ -164,7 +164,7 @@ Base.isreadable(::WindowWriter) = false function Base.close(w::WindowIO) w.is_open = false notify(w.read_requested) - wait(w.waiter) # Wait for the data notification loop to finish + fetch(w.waiter) # Wait for the data notification loop to finish MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) w.header.count = 0 w.ptr = 0 @@ -211,18 +211,18 @@ function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); al if nb_read == 0 return 0 end - copy!(b, 1, w.buffer, w.ptr+1, nb_read) + copyto!(b, 1, w.buffer, w.ptr+1, nb_read) w.ptr += nb_read complete_read(w) return nb_read end -Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) +Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(undef,nb_available(w))) @inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt) nb_obtained = wait_nb_available(w,nb) nb_read = min(nb_obtained, nb) - unsafe_copy!(p, pointer(w.buffer, w.ptr+1), nb_read) + unsafe_copyto!(p, pointer(w.buffer, w.ptr+1), nb_read) w.ptr += nb_read complete_read(w) if nb_read != nb @@ -232,7 +232,7 @@ Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) end function Base.read(w::WindowIO, nb::Integer; all::Bool=true) - buf = Vector{UInt8}(nb) + buf = Vector{UInt8}(undef,nb) readbytes!(w, buf, nb, all=all) return buf end @@ -253,7 +253,7 @@ function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt) offset = w.nb_written+1 w.nb_written += nb ensureroom(w) - copy!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) + copyto!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) return nb end diff --git a/test/test_windowcman.jl b/test/test_windowcman.jl index 62a0e4416..14136f2e7 100644 --- a/test/test_windowcman.jl +++ b/test/test_windowcman.jl @@ -1,15 +1,17 @@ +using Distributed + const using_mpi = (nprocs() == 1) if using_mpi using MPI - using Base.Test + using Test transport_mode = MPI.MPI_WINDOW_IO # This test can run with MPPI_TRANSPORT_ALL and MPI_WINDOW_IO mgr = MPI.start_main_loop(transport_mode) @everywhere const comm = MPI.COMM_WORLD rank = MPI.Comm_rank(comm) size = MPI.Comm_size(comm) else - @everywhere using Base.Test + @everywhere using Test end @everywhere const N = nprocs() @@ -23,7 +25,7 @@ if N > 1 @test workers() == collect(2:N) end -results = Vector{Any}(N) +results = Vector{Any}(undef,N) for i in 1:N results[i] = remotecall(myid, i) end @@ -52,7 +54,7 @@ has_arrays = true try @everywhere using DistributedArrays catch e - has_arrays = false + global has_arrays = false end if has_arrays diff --git a/test/test_windowcman_nowait.jl b/test/test_windowcman_nowait.jl index 344b5a464..ea19039fa 100644 --- a/test/test_windowcman_nowait.jl +++ b/test/test_windowcman_nowait.jl @@ -1,5 +1,6 @@ +using Distributed using MPI -using Base.Test +using Test mgr = MPI.start_main_loop(MPI_WINDOW_NOWAIT) diff --git a/test/test_windowio.jl b/test/test_windowio.jl index 5700c3c7a..0f6bb82ea 100644 --- a/test/test_windowio.jl +++ b/test/test_windowio.jl @@ -1,4 +1,4 @@ -using Base.Test +using Test using MPI MPI.Init() @@ -33,7 +33,7 @@ if rank == 1 flush(writer) # Must flush to trigger communication elseif rank == 0 @test MPI.wait_nb_available(winio) == 5 - arr = Vector{UInt8}(5) + arr = Vector{UInt8}(undef,5) readbytes!(winio, arr) @test String(arr) == "hello" @test nb_available(winio) == 0 @@ -46,7 +46,7 @@ if rank == 1 flush(writer) # Must flush to trigger communication elseif rank == 0 @test MPI.wait_nb_available(winio) == 5 - arr = Vector{UInt8}(5) + arr = Vector{UInt8}(undef,5) unsafe_read(winio, pointer(arr), 2) @test nb_available(winio) == 3 unsafe_read(winio, pointer(arr,3), 3) @@ -60,10 +60,10 @@ if rank == 1 write(writer, "hello") flush(writer) # Must flush to trigger communication elseif rank == 0 - arr = Vector{UInt8}(3) + arr = Vector{UInt8}(undef,3) readbytes!(winio, arr) # waits for data @test nb_available(winio) == 2 - @test String(arr) == "hel" + @test String(copy(arr)) == "hel" fill!(arr, UInt8('a')) readbytes!(winio, arr, all=false) # reads what's available @test String(arr) == "loa" @@ -121,7 +121,7 @@ if rank == 1 @time flush(writer) MPI.Barrier(comm) elseif rank == 0 - const recarr = Vector{UInt8}(100000) + const recarr = Vector{UInt8}(undef,100000) MPI.Barrier(comm) println("read timings:") @@ -141,12 +141,12 @@ if rank != 0 println(writer, message*string(rank)) flush(writer) # Must flush to trigger communication else - nb_received = 0 + global nb_received = 0 while nb_received != N-1 line = readline(winio) @test startswith(line, message) @test line[length(message)+1:end] ∈ string.(1:N) - nb_received += 1 + global nb_received += 1 end end @@ -162,7 +162,7 @@ if rank != 0 write(writer2, rank*ones(BS)) flush(writer2) # Must flush to trigger communication else - result = Vector{Float64}(BS*(N-1)) + result = Vector{Float64}(undef,BS*(N-1)) read!(winio2, result) header = winio2.header @test nb_available(winio2) == 0