Skip to content

Commit

Permalink
Update for Julia v0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
barche committed Aug 5, 2018
1 parent aed7b03 commit 5cb223f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 32 deletions.
22 changes: 12 additions & 10 deletions src/window-cman.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import Base: launch, kill, manage, connect
import Base: kill
using Distributed
import Distributed: launch, kill, manage, connect
export MPIWindowIOManager, launch, kill, manage, connect, @cluster

"""
Expand All @@ -12,8 +14,8 @@ mutable struct MPIWindowIOManager <: ClusterManager

function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool)
nb_procs = MPI.Comm_size(comm)
connection_windows = Vector{WindowIO}(nb_procs)
stdio_windows = Vector{WindowIO}(nb_procs)
connection_windows = Vector{WindowIO}(undef,nb_procs)
stdio_windows = Vector{WindowIO}(undef,nb_procs)

for i in 1:nb_procs
connection_windows[i] = WindowIO(comm)
Expand Down Expand Up @@ -62,7 +64,7 @@ function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
myrank = MPI.Comm_rank(mgr.comm)
if myrank == 0
proc_stdio = mgr.stdio_windows[pid]
@schedule while !eof(proc_stdio)
@async while !eof(proc_stdio)
try
println("\tFrom worker $(pid):\t$(readline(proc_stdio))")
catch e
Expand All @@ -74,7 +76,7 @@ end

function redirect_to_mpi(s::WindowWriter)
(rd, wr) = redirect_stdout()
@schedule while !eof(rd) && isopen(s.winio)
@async while !eof(rd) && isopen(s.winio)
av = readline(rd)
if isopen(s.winio)
println(s,av)
Expand Down Expand Up @@ -112,13 +114,13 @@ function start_window_worker(comm::Comm, workers_wait)

manager = MPIWindowIOManager(comm, workers_wait)
cookie = string(comm)
if length(cookie) > Base.Distributed.HDR_COOKIE_LEN
cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN]
if length(cookie) > Distributed.HDR_COOKIE_LEN
cookie = cookie[1:Distributed.HDR_COOKIE_LEN]
end

try
if rank == 0
Base.cluster_cookie(cookie)
Distributed.cluster_cookie(cookie)
MPI.Barrier(comm)
addprocs(manager)
@assert nprocs() == N
Expand All @@ -134,14 +136,14 @@ function start_window_worker(comm::Comm, workers_wait)
redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0))
for i in vcat([1], (rank+2):N)
# Receiving end of connections to all higher workers and master
Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
Distributed.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
end

global _stop_requested = Condition()
wait_for_events()
end
catch e
Base.display_error(STDERR,"exception $e on rank $rank",backtrace())
Base.display_error(stderr,"exception $e on rank $rank",backtrace())
end

if workers_wait && rank != 0
Expand Down
16 changes: 8 additions & 8 deletions src/window-io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mutable struct WindowIO <: IO
waiter

function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2)
buffer = Array{UInt8,1}(bufsize)
buffer = Array{UInt8,1}(undef, bufsize)
header_win = MPI.Win()
header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize)
remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize)
Expand Down Expand Up @@ -140,7 +140,7 @@ mutable struct WindowWriter <: IO
nb_written::Int

function WindowWriter(w::WindowIO, target::Integer)
return new(w, target, Vector{UInt8}(1024^2), ReentrantLock(), 0)
return new(w, target, Vector{UInt8}(undef,1024^2), ReentrantLock(), 0)
end
end

Expand All @@ -164,7 +164,7 @@ Base.isreadable(::WindowWriter) = false
function Base.close(w::WindowIO)
w.is_open = false
notify(w.read_requested)
wait(w.waiter) # Wait for the data notification loop to finish
fetch(w.waiter) # Wait for the data notification loop to finish
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
w.header.count = 0
w.ptr = 0
Expand Down Expand Up @@ -211,18 +211,18 @@ function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); al
if nb_read == 0
return 0
end
copy!(b, 1, w.buffer, w.ptr+1, nb_read)
copyto!(b, 1, w.buffer, w.ptr+1, nb_read)
w.ptr += nb_read
complete_read(w)
return nb_read
end

Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w)))
Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(undef,nb_available(w)))

@inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt)
nb_obtained = wait_nb_available(w,nb)
nb_read = min(nb_obtained, nb)
unsafe_copy!(p, pointer(w.buffer, w.ptr+1), nb_read)
unsafe_copyto!(p, pointer(w.buffer, w.ptr+1), nb_read)
w.ptr += nb_read
complete_read(w)
if nb_read != nb
Expand All @@ -232,7 +232,7 @@ Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w)))
end

function Base.read(w::WindowIO, nb::Integer; all::Bool=true)
buf = Vector{UInt8}(nb)
buf = Vector{UInt8}(undef,nb)
readbytes!(w, buf, nb, all=all)
return buf
end
Expand All @@ -253,7 +253,7 @@ function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt)
offset = w.nb_written+1
w.nb_written += nb
ensureroom(w)
copy!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb)
copyto!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb)
return nb
end

Expand Down
10 changes: 6 additions & 4 deletions test/test_windowcman.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
using Distributed

const using_mpi = (nprocs() == 1)

if using_mpi
using MPI
using Base.Test
using Test
transport_mode = MPI.MPI_WINDOW_IO # This test can run with MPPI_TRANSPORT_ALL and MPI_WINDOW_IO
mgr = MPI.start_main_loop(transport_mode)
@everywhere const comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
else
@everywhere using Base.Test
@everywhere using Test
end

@everywhere const N = nprocs()
Expand All @@ -23,7 +25,7 @@ if N > 1
@test workers() == collect(2:N)
end

results = Vector{Any}(N)
results = Vector{Any}(undef,N)
for i in 1:N
results[i] = remotecall(myid, i)
end
Expand Down Expand Up @@ -52,7 +54,7 @@ has_arrays = true
try
@everywhere using DistributedArrays
catch e
has_arrays = false
global has_arrays = false
end

if has_arrays
Expand Down
3 changes: 2 additions & 1 deletion test/test_windowcman_nowait.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Distributed
using MPI
using Base.Test
using Test

mgr = MPI.start_main_loop(MPI_WINDOW_NOWAIT)

Expand Down
18 changes: 9 additions & 9 deletions test/test_windowio.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Base.Test
using Test
using MPI

MPI.Init()
Expand Down Expand Up @@ -33,7 +33,7 @@ if rank == 1
flush(writer) # Must flush to trigger communication
elseif rank == 0
@test MPI.wait_nb_available(winio) == 5
arr = Vector{UInt8}(5)
arr = Vector{UInt8}(undef,5)
readbytes!(winio, arr)
@test String(arr) == "hello"
@test nb_available(winio) == 0
Expand All @@ -46,7 +46,7 @@ if rank == 1
flush(writer) # Must flush to trigger communication
elseif rank == 0
@test MPI.wait_nb_available(winio) == 5
arr = Vector{UInt8}(5)
arr = Vector{UInt8}(undef,5)
unsafe_read(winio, pointer(arr), 2)
@test nb_available(winio) == 3
unsafe_read(winio, pointer(arr,3), 3)
Expand All @@ -60,10 +60,10 @@ if rank == 1
write(writer, "hello")
flush(writer) # Must flush to trigger communication
elseif rank == 0
arr = Vector{UInt8}(3)
arr = Vector{UInt8}(undef,3)
readbytes!(winio, arr) # waits for data
@test nb_available(winio) == 2
@test String(arr) == "hel"
@test String(copy(arr)) == "hel"
fill!(arr, UInt8('a'))
readbytes!(winio, arr, all=false) # reads what's available
@test String(arr) == "loa"
Expand Down Expand Up @@ -121,7 +121,7 @@ if rank == 1
@time flush(writer)
MPI.Barrier(comm)
elseif rank == 0
const recarr = Vector{UInt8}(100000)
const recarr = Vector{UInt8}(undef,100000)
MPI.Barrier(comm)

println("read timings:")
Expand All @@ -141,12 +141,12 @@ if rank != 0
println(writer, message*string(rank))
flush(writer) # Must flush to trigger communication
else
nb_received = 0
global nb_received = 0
while nb_received != N-1
line = readline(winio)
@test startswith(line, message)
@test line[length(message)+1:end] string.(1:N)
nb_received += 1
global nb_received += 1
end
end

Expand All @@ -162,7 +162,7 @@ if rank != 0
write(writer2, rank*ones(BS))
flush(writer2) # Must flush to trigger communication
else
result = Vector{Float64}(BS*(N-1))
result = Vector{Float64}(undef,BS*(N-1))
read!(winio2, result)
header = winio2.header
@test nb_available(winio2) == 0
Expand Down

0 comments on commit 5cb223f

Please sign in to comment.