From 5cb223f7da7e578d6b43cbd2a120406eae5036ef Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Sun, 5 Aug 2018 10:32:10 +0100 Subject: [PATCH] Update for Julia v0.7 --- src/window-cman.jl | 22 ++++++++++++---------- src/window-io.jl | 16 ++++++++-------- test/test_windowcman.jl | 10 ++++++---- test/test_windowcman_nowait.jl | 3 ++- test/test_windowio.jl | 18 +++++++++--------- 5 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/window-cman.jl b/src/window-cman.jl index 808c7915e..6a4f8e29d 100644 --- a/src/window-cman.jl +++ b/src/window-cman.jl @@ -1,4 +1,6 @@ -import Base: launch, kill, manage, connect +import Base: kill +using Distributed +import Distributed: launch, kill, manage, connect export MPIWindowIOManager, launch, kill, manage, connect, @cluster """ @@ -12,8 +14,8 @@ mutable struct MPIWindowIOManager <: ClusterManager function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool) nb_procs = MPI.Comm_size(comm) - connection_windows = Vector{WindowIO}(nb_procs) - stdio_windows = Vector{WindowIO}(nb_procs) + connection_windows = Vector{WindowIO}(undef,nb_procs) + stdio_windows = Vector{WindowIO}(undef,nb_procs) for i in 1:nb_procs connection_windows[i] = WindowIO(comm) @@ -62,7 +64,7 @@ function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig) myrank = MPI.Comm_rank(mgr.comm) if myrank == 0 proc_stdio = mgr.stdio_windows[pid] - @schedule while !eof(proc_stdio) + @async while !eof(proc_stdio) try println("\tFrom worker $(pid):\t$(readline(proc_stdio))") catch e @@ -74,7 +76,7 @@ end function redirect_to_mpi(s::WindowWriter) (rd, wr) = redirect_stdout() - @schedule while !eof(rd) && isopen(s.winio) + @async while !eof(rd) && isopen(s.winio) av = readline(rd) if isopen(s.winio) println(s,av) @@ -112,13 +114,13 @@ function start_window_worker(comm::Comm, workers_wait) manager = MPIWindowIOManager(comm, workers_wait) cookie = string(comm) - if length(cookie) > Base.Distributed.HDR_COOKIE_LEN - cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN] + if length(cookie) > Distributed.HDR_COOKIE_LEN + cookie = cookie[1:Distributed.HDR_COOKIE_LEN] end try if rank == 0 - Base.cluster_cookie(cookie) + Distributed.cluster_cookie(cookie) MPI.Barrier(comm) addprocs(manager) @assert nprocs() == N @@ -134,14 +136,14 @@ function start_window_worker(comm::Comm, workers_wait) redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0)) for i in vcat([1], (rank+2):N) # Receiving end of connections to all higher workers and master - Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) + Distributed.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1)) end global _stop_requested = Condition() wait_for_events() end catch e - Base.display_error(STDERR,"exception $e on rank $rank",backtrace()) + Base.display_error(stderr,"exception $e on rank $rank",backtrace()) end if workers_wait && rank != 0 diff --git a/src/window-io.jl b/src/window-io.jl index 930e5af59..ecd26b875 100644 --- a/src/window-io.jl +++ b/src/window-io.jl @@ -38,7 +38,7 @@ mutable struct WindowIO <: IO waiter function WindowIO(comm=MPI.COMM_WORLD, bufsize=1024^2) - buffer = Array{UInt8,1}(bufsize) + buffer = Array{UInt8,1}(undef, bufsize) header_win = MPI.Win() header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) remote_header = BufferHeader(0, MPI.Get_address(buffer), bufsize, bufsize) @@ -140,7 +140,7 @@ mutable struct WindowWriter <: IO nb_written::Int function WindowWriter(w::WindowIO, target::Integer) - return new(w, target, Vector{UInt8}(1024^2), ReentrantLock(), 0) + return new(w, target, Vector{UInt8}(undef,1024^2), ReentrantLock(), 0) end end @@ -164,7 +164,7 @@ Base.isreadable(::WindowWriter) = false function Base.close(w::WindowIO) w.is_open = false notify(w.read_requested) - wait(w.waiter) # Wait for the data notification loop to finish + fetch(w.waiter) # Wait for the data notification loop to finish MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) w.header.count = 0 w.ptr = 0 @@ -211,18 +211,18 @@ function Base.readbytes!(w::WindowIO, b::AbstractVector{UInt8}, nb=length(b); al if nb_read == 0 return 0 end - copy!(b, 1, w.buffer, w.ptr+1, nb_read) + copyto!(b, 1, w.buffer, w.ptr+1, nb_read) w.ptr += nb_read complete_read(w) return nb_read end -Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) +Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(undef,nb_available(w))) @inline function Base.unsafe_read(w::WindowIO, p::Ptr{UInt8}, nb::UInt) nb_obtained = wait_nb_available(w,nb) nb_read = min(nb_obtained, nb) - unsafe_copy!(p, pointer(w.buffer, w.ptr+1), nb_read) + unsafe_copyto!(p, pointer(w.buffer, w.ptr+1), nb_read) w.ptr += nb_read complete_read(w) if nb_read != nb @@ -232,7 +232,7 @@ Base.readavailable(w::WindowIO) = read!(w, Vector{UInt8}(nb_available(w))) end function Base.read(w::WindowIO, nb::Integer; all::Bool=true) - buf = Vector{UInt8}(nb) + buf = Vector{UInt8}(undef,nb) readbytes!(w, buf, nb, all=all) return buf end @@ -253,7 +253,7 @@ function Base.unsafe_write(w::WindowWriter, p::Ptr{UInt8}, nb::UInt) offset = w.nb_written+1 w.nb_written += nb ensureroom(w) - copy!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) + copyto!(w.write_buffer, offset, unsafe_wrap(Array{UInt8}, p, nb), 1, nb) return nb end diff --git a/test/test_windowcman.jl b/test/test_windowcman.jl index 62a0e4416..14136f2e7 100644 --- a/test/test_windowcman.jl +++ b/test/test_windowcman.jl @@ -1,15 +1,17 @@ +using Distributed + const using_mpi = (nprocs() == 1) if using_mpi using MPI - using Base.Test + using Test transport_mode = MPI.MPI_WINDOW_IO # This test can run with MPPI_TRANSPORT_ALL and MPI_WINDOW_IO mgr = MPI.start_main_loop(transport_mode) @everywhere const comm = MPI.COMM_WORLD rank = MPI.Comm_rank(comm) size = MPI.Comm_size(comm) else - @everywhere using Base.Test + @everywhere using Test end @everywhere const N = nprocs() @@ -23,7 +25,7 @@ if N > 1 @test workers() == collect(2:N) end -results = Vector{Any}(N) +results = Vector{Any}(undef,N) for i in 1:N results[i] = remotecall(myid, i) end @@ -52,7 +54,7 @@ has_arrays = true try @everywhere using DistributedArrays catch e - has_arrays = false + global has_arrays = false end if has_arrays diff --git a/test/test_windowcman_nowait.jl b/test/test_windowcman_nowait.jl index 344b5a464..ea19039fa 100644 --- a/test/test_windowcman_nowait.jl +++ b/test/test_windowcman_nowait.jl @@ -1,5 +1,6 @@ +using Distributed using MPI -using Base.Test +using Test mgr = MPI.start_main_loop(MPI_WINDOW_NOWAIT) diff --git a/test/test_windowio.jl b/test/test_windowio.jl index 5700c3c7a..0f6bb82ea 100644 --- a/test/test_windowio.jl +++ b/test/test_windowio.jl @@ -1,4 +1,4 @@ -using Base.Test +using Test using MPI MPI.Init() @@ -33,7 +33,7 @@ if rank == 1 flush(writer) # Must flush to trigger communication elseif rank == 0 @test MPI.wait_nb_available(winio) == 5 - arr = Vector{UInt8}(5) + arr = Vector{UInt8}(undef,5) readbytes!(winio, arr) @test String(arr) == "hello" @test nb_available(winio) == 0 @@ -46,7 +46,7 @@ if rank == 1 flush(writer) # Must flush to trigger communication elseif rank == 0 @test MPI.wait_nb_available(winio) == 5 - arr = Vector{UInt8}(5) + arr = Vector{UInt8}(undef,5) unsafe_read(winio, pointer(arr), 2) @test nb_available(winio) == 3 unsafe_read(winio, pointer(arr,3), 3) @@ -60,10 +60,10 @@ if rank == 1 write(writer, "hello") flush(writer) # Must flush to trigger communication elseif rank == 0 - arr = Vector{UInt8}(3) + arr = Vector{UInt8}(undef,3) readbytes!(winio, arr) # waits for data @test nb_available(winio) == 2 - @test String(arr) == "hel" + @test String(copy(arr)) == "hel" fill!(arr, UInt8('a')) readbytes!(winio, arr, all=false) # reads what's available @test String(arr) == "loa" @@ -121,7 +121,7 @@ if rank == 1 @time flush(writer) MPI.Barrier(comm) elseif rank == 0 - const recarr = Vector{UInt8}(100000) + const recarr = Vector{UInt8}(undef,100000) MPI.Barrier(comm) println("read timings:") @@ -141,12 +141,12 @@ if rank != 0 println(writer, message*string(rank)) flush(writer) # Must flush to trigger communication else - nb_received = 0 + global nb_received = 0 while nb_received != N-1 line = readline(winio) @test startswith(line, message) @test line[length(message)+1:end] ∈ string.(1:N) - nb_received += 1 + global nb_received += 1 end end @@ -162,7 +162,7 @@ if rank != 0 write(writer2, rank*ones(BS)) flush(writer2) # Must flush to trigger communication else - result = Vector{Float64}(BS*(N-1)) + result = Vector{Float64}(undef,BS*(N-1)) read!(winio2, result) header = winio2.header @test nb_available(winio2) == 0