From f9f2233ef9b768606dedc3acde01dc175fc8dfff Mon Sep 17 00:00:00 2001 From: Bart Janssens Date: Mon, 26 Feb 2018 08:05:48 +0100 Subject: [PATCH] Simplify wait_nb_available in WindowIO --- src/window-io.jl | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/window-io.jl b/src/window-io.jl index d40e61e14..930e5af59 100644 --- a/src/window-io.jl +++ b/src/window-io.jl @@ -29,7 +29,6 @@ mutable struct WindowIO <: IO header::BufferHeader remote_header::BufferHeader header_win::Win - header_cwin::CWin is_open::Bool # Current read position ptr::WinCountT @@ -56,7 +55,6 @@ mutable struct WindowIO <: IO header, remote_header, header_win, - CWin(header_win), true, 0, Condition(), @@ -92,23 +90,23 @@ function has_data_available(w::WindowIO) return false end - if w.header.count > w.ptr && w.header.needed_length == w.header.length # fast check without window sync - return true - end - - # Check if we need to grow the buffer - MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) - MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate - if w.header.needed_length > w.header.length + MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) + have_data = w.header.count > w.ptr + need_grow = w.header.needed_length > w.header.length + MPI.Win_unlock(w.myrank, w.header_win) + + # Grow buffer if needed + if need_grow MPI.Win_detach(w.win, w.buffer) resize!(w.buffer, w.header.needed_length) MPI.Win_attach(w.win, w.buffer) + MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win) w.header.address = MPI.Get_address(w.buffer) w.header.length = w.header.needed_length + MPI.Win_unlock(w.myrank, w.header_win) end - MPI.Win_unlock(w.myrank, w.header_win) - return w.header.count > w.ptr + return have_data end function Base.wait(w::WindowIO) @@ -126,11 +124,8 @@ end # wait until the specified number of bytes is available or the stream is closed function wait_nb_available(w, nb) - nb_found = wait_nb_available(w) + nb_found = 0 while nb_found < nb && w.is_open - MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win) - MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates - MPI.Win_unlock(w.myrank, w.header_win) nb_found = wait_nb_available(w) end return nb_found