Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WindowIO and MPIWindowIOManager #203

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j

This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on.

The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
On mpi rank 0, it returns a `manager` which can be used with `@mpi_do`
On other processes (i.e., the workers) the function does not return


### MPIManager
### (MPI transport - all processes execute MPI code)
`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
`mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport.

## Julia MPI-only interface
Expand All @@ -189,6 +189,11 @@ juliacomm = MPI.COMM_WORLD
ccomm = MPI.CComm(juliacomm)
```

### MPIWindowIOManager
This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO
based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster`
macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples.

### Currently wrapped MPI functions
Convention: `MPI_Fun => MPI.Fun`

Expand Down
2 changes: 2 additions & 0 deletions src/MPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ include(depfile)

include("mpi-base.jl")
include("cman.jl")
include("window-io.jl")
include("window-cman.jl")

const mpitype_dict = Dict{DataType, Cint}()
const mpitype_dict_inverse = Dict{Cint, DataType}()
Expand Down
45 changes: 28 additions & 17 deletions src/cman.jl
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
import Base: kill
export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT
using Compat
using Compat.Distributed
import Compat.Sockets: connect, listenany, accept, IPv4, getsockname



################################################################################
# MPI Cluster Manager
# Note: The cluster manager object lives only in the manager process,
# except for MPI_TRANSPORT_ALL
"""
MPI Cluster Manager
Note: The cluster manager object lives only in the manager process,
except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO

There are four different transport modes:

# There are three different transport modes:
MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
allows interactive use from a Julia shell, using the familiar `addprocs`
interface.

# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
# allows interactive use from a Julia shell, using the familiar `addprocs`
# interface.
MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
process. This corresponds to the "usual" way in which MPI is used in a
headless mode, e.g. submitted as a script to a queueing system.

# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
# process. This corresponds to the "usual" way in which MPI is used in a
# headless mode, e.g. submitted as a script to a queueing system.
TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
communication between processes. MPI can still be used by the user.

# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
# communication between processes. MPI can still be used by the user.
MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes.
The program must be started with mpirun or equivalent.

@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL
MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster
macro. All other code runs as regular MPI code (single program, multiple data).
"""
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT

mutable struct MPIManager <: ClusterManager
np::Int # number of worker processes (excluding the manager process)
Expand Down Expand Up @@ -319,8 +325,9 @@ end
################################################################################
# Alternative startup model: All Julia processes are started via an external
# mpirun, and the user does not call addprocs.

# Enter the MPI cluster manager's main loop (does not return on the workers)
"""
Enter the MPI cluster manager's main loop (does not return on the workers)
"""
function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
comm::MPI.Comm=MPI.COMM_WORLD)
!MPI.Initialized() && MPI.Init()
Expand Down Expand Up @@ -392,6 +399,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
MPI.Finalize()
exit()
end
elseif mode == MPI_WINDOW_IO
start_window_worker(comm, true)
elseif mode == MPI_WINDOW_NOWAIT
start_window_worker(comm, false)
else
error("Unknown mode $mode")
end
Expand Down
187 changes: 187 additions & 0 deletions src/window-cman.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import Base: kill
using Distributed
import Distributed: launch, kill, manage, connect
export MPIWindowIOManager, launch, kill, manage, connect, @cluster

"""
Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
"""
mutable struct MPIWindowIOManager <: ClusterManager
comm::MPI.Comm
connection_windows::Vector{WindowIO}
stdio_windows::Vector{WindowIO}
workers_wait::Bool

function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool)
nb_procs = MPI.Comm_size(comm)
connection_windows = Vector{WindowIO}(undef,nb_procs)
stdio_windows = Vector{WindowIO}(undef,nb_procs)

for i in 1:nb_procs
connection_windows[i] = WindowIO(comm)
stdio_windows[i] = WindowIO(comm)
end

# Make sure all windows are created before continuing
MPI.Barrier(comm)

return new(comm, connection_windows, stdio_windows, workers_wait)
end
end

# Closes all local MPI Windows in a manager. Must be called collectively on all ranks
function closeall(manager::MPIWindowIOManager)
for w in manager.connection_windows
close(w)
end
for w in manager.stdio_windows
close(w)
end
end

function launch(mgr::MPIWindowIOManager, params::Dict,
instances::Array, cond::Condition)
try
nprocs = MPI.Comm_size(mgr.comm)
for cnt in 1:(nprocs-1)
push!(instances, WorkerConfig())
end
notify(cond)
catch e
println("Error in MPI launch $e")
rethrow(e)
end
end

function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
@spawnat pid notify(_stop_requested)
Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED)
end

function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end

function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
myrank = MPI.Comm_rank(mgr.comm)
if myrank == 0
proc_stdio = mgr.stdio_windows[pid]
@async while !eof(proc_stdio)
try
println("\tFrom worker $(pid):\t$(readline(proc_stdio))")
catch e
end
end
end
return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1))
end

function redirect_to_mpi(s::WindowWriter)
(rd, wr) = redirect_stdout()
@async while !eof(rd) && isopen(s.winio)
av = readline(rd)
if isopen(s.winio)
println(s,av)
flush(s)
end
end
end

function checkworkers()
for w in workers()
if w != (@fetchfrom w myid())
error("worker $w is not waiting")
end
end
end

function notify_workers()
for w in workers()
@spawnat(w, notify(_stop_requested))
end
end

function wait_for_events()
global _stop_requested
wait(_stop_requested)
end

"""
Initialize the current process as a Julia parallel worker. Must be called on all ranks.
If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
"""
function start_window_worker(comm::Comm, workers_wait)
rank = MPI.Comm_rank(comm)
N = MPI.Comm_size(comm)

manager = MPIWindowIOManager(comm, workers_wait)
cookie = string(comm)
if length(cookie) > Distributed.HDR_COOKIE_LEN
cookie = cookie[1:Distributed.HDR_COOKIE_LEN]
end

try
if rank == 0
Distributed.cluster_cookie(cookie)
MPI.Barrier(comm)
addprocs(manager)
@assert nprocs() == N
@assert nworkers() == (N == 1 ? 1 : N-1)

if !workers_wait
checkworkers()
notify_workers()
end
else
init_worker(cookie, manager)
MPI.Barrier(comm)
redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0))
for i in vcat([1], (rank+2):N)
# Receiving end of connections to all higher workers and master
Distributed.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
end

global _stop_requested = Condition()
wait_for_events()
end
catch e
Base.display_error(stderr,"exception $e on rank $rank",backtrace())
end

if workers_wait && rank != 0
closeall(manager)
MPI.Finalize()
exit(0)
end

return manager
end

"""
Stop the manager. This closes all windows and calls MPI.Finalize on all workers
"""
function stop_main_loop(manager::MPIWindowIOManager)
if myid() != 1
wait_for_events()
else
checkworkers()
if nprocs() > 1
rmprocs(workers())
end
end
closeall(manager)
MPI.Finalize()
end

"""
Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
since this will temporarily activate the worker event loops to listen for messages.
"""
macro cluster(expr)
quote
if myid() != 1
wait_for_events()
else
$(esc(expr))
notify_workers()
end
end
end
Loading