Skip to content

Commit

Permalink
Simplify wait_nb_available in WindowIO
Browse files Browse the repository at this point in the history
  • Loading branch information
barche committed Aug 2, 2018
1 parent 2e007d4 commit f9f2233
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/window-io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mutable struct WindowIO <: IO
header::BufferHeader
remote_header::BufferHeader
header_win::Win
header_cwin::CWin
is_open::Bool
# Current read position
ptr::WinCountT
Expand All @@ -56,7 +55,6 @@ mutable struct WindowIO <: IO
header,
remote_header,
header_win,
CWin(header_win),
true,
0,
Condition(),
Expand Down Expand Up @@ -92,23 +90,23 @@ function has_data_available(w::WindowIO)
return false
end

if w.header.count > w.ptr && w.header.needed_length == w.header.length # fast check without window sync
return true
end

# Check if we need to grow the buffer
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate
if w.header.needed_length > w.header.length
MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win)
have_data = w.header.count > w.ptr
need_grow = w.header.needed_length > w.header.length
MPI.Win_unlock(w.myrank, w.header_win)

# Grow buffer if needed
if need_grow
MPI.Win_detach(w.win, w.buffer)
resize!(w.buffer, w.header.needed_length)
MPI.Win_attach(w.win, w.buffer)
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
w.header.address = MPI.Get_address(w.buffer)
w.header.length = w.header.needed_length
MPI.Win_unlock(w.myrank, w.header_win)
end
MPI.Win_unlock(w.myrank, w.header_win)

return w.header.count > w.ptr
return have_data
end

function Base.wait(w::WindowIO)
Expand All @@ -126,11 +124,8 @@ end

# wait until the specified number of bytes is available or the stream is closed
function wait_nb_available(w, nb)
nb_found = wait_nb_available(w)
nb_found = 0
while nb_found < nb && w.is_open
MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win)
MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates
MPI.Win_unlock(w.myrank, w.header_win)
nb_found = wait_nb_available(w)
end
return nb_found
Expand Down

0 comments on commit f9f2233

Please sign in to comment.