Skip to content

Commit

Permalink
Fix flakiness in JimfsAsynchronousFileChannelTest.
Browse files Browse the repository at this point in the history
The tests are racy, and I don't have a good way to fix that. This change just makes the (much) rarer case where the race ends with the channel being closed before the read/write operations get to the point where they check that the channel is open also pass the test. Unfortunately it doesn't differentiate between that case happening once in a thousand times and happening every time (which would be a problem).

Also simplify a bunch of stuff using SettableFuture.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=112606151
  • Loading branch information
cgdecker committed Jan 20, 2016
1 parent f8576af commit 3992dc1
Showing 1 changed file with 86 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@

import static com.google.common.jimfs.TestUtils.buffer;
import static com.google.common.jimfs.TestUtils.regularFile;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Runnables;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;

import org.junit.Test;
Expand All @@ -44,14 +46,10 @@
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLock;
import java.nio.file.OpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* Tests for {@link JimfsAsynchronousFileChannel}.
Expand Down Expand Up @@ -100,7 +98,7 @@ public void testAsyncChannel() throws Throwable {
}

@Test
public void testClosedChannel() throws IOException, InterruptedException {
public void testClosedChannel() throws Throwable {
RegularFile file = regularFile(15);
ExecutorService executor = Executors.newSingleThreadExecutor();

Expand All @@ -118,7 +116,7 @@ public void testClosedChannel() throws IOException, InterruptedException {
}

@Test
public void testAsyncClose_write() throws IOException, InterruptedException {
public void testAsyncClose_write() throws Throwable {
RegularFile file = regularFile(15);
ExecutorService executor = Executors.newFixedThreadPool(4);

Expand All @@ -127,47 +125,32 @@ public void testAsyncClose_write() throws IOException, InterruptedException {

file.writeLock().lock(); // cause another thread trying to write to block

// future-returning write
Future<Integer> future = channel.write(ByteBuffer.allocate(10), 0);

final CountDownLatch handlerLatch = new CountDownLatch(1);
final AtomicBoolean gotAsyncCloseException = new AtomicBoolean(false);
channel.write(
ByteBuffer.allocate(10), 0, null,
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
handlerLatch.countDown();
}

@Override
public void failed(Throwable exc, Object attachment) {
gotAsyncCloseException.set(exc instanceof AsynchronousCloseException);
handlerLatch.countDown();
}
});

// give enough time to ensure both writes start blocking
// completion handler write
SettableFuture<Integer> completionHandlerFuture = SettableFuture.create();
channel.write(ByteBuffer.allocate(10), 0, null, setFuture(completionHandlerFuture));

// Despite this 10ms sleep to allow plenty of time, it's possible, though very rare, for a
// race to cause the channel to be closed before the asynchronous calls get to the initial
// check that the channel is open, causing ClosedChannelException to be thrown rather than
// AsynchronousCloseException. This is not a problem in practice, just a quirk of how these
// tests work and that we don't have a way of waiting for the operations to get past that
// check.
Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);

channel.close();

try {
future.get();
fail();
} catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof AsynchronousCloseException);
}

handlerLatch.await();

assertTrue(gotAsyncCloseException.get());
assertAsynchronousClose(future);
assertAsynchronousClose(completionHandlerFuture);
} finally {
executor.shutdown();
}
}

@Test
public void testAsyncClose_read() throws IOException, InterruptedException {
public void testAsyncClose_read() throws Throwable {
RegularFile file = regularFile(15);
ExecutorService executor = Executors.newFixedThreadPool(2);

Expand All @@ -176,40 +159,25 @@ public void testAsyncClose_read() throws IOException, InterruptedException {

file.writeLock().lock(); // cause another thread trying to read to block

// future-returning read
Future<Integer> future = channel.read(ByteBuffer.allocate(10), 0);

final CountDownLatch handlerLatch = new CountDownLatch(1);
final AtomicBoolean gotAsyncCloseException = new AtomicBoolean(false);
channel.read(
ByteBuffer.allocate(10), 0, null,
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
handlerLatch.countDown();
}

@Override
public void failed(Throwable exc, Object attachment) {
gotAsyncCloseException.set(exc instanceof AsynchronousCloseException);
handlerLatch.countDown();
}
});

// give enough time to ensure both reads start blocking
// completion handler read
SettableFuture<Integer> completionHandlerFuture = SettableFuture.create();
channel.read(ByteBuffer.allocate(10), 0, null, setFuture(completionHandlerFuture));

// Despite this 10ms sleep to allow plenty of time, it's possible, though very rare, for a
// race to cause the channel to be closed before the asynchronous calls get to the initial
// check that the channel is open, causing ClosedChannelException to be thrown rather than
// AsynchronousCloseException. This is not a problem in practice, just a quirk of how these
// tests work and that we don't have a way of waiting for the operations to get past that
// check.
Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);

channel.close();

try {
future.get();
fail();
} catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof AsynchronousCloseException);
}

handlerLatch.await();

assertTrue(gotAsyncCloseException.get());
assertAsynchronousClose(future);
assertAsynchronousClose(completionHandlerFuture);
} finally {
executor.shutdown();
}
Expand All @@ -220,105 +188,78 @@ private static void checkAsyncRead(AsynchronousFileChannel channel) throws Throw
assertEquals(10, (int) channel.read(buf, 0).get());

buf.flip();
final AtomicInteger resultHolder = new AtomicInteger(-1);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
final CountDownLatch completionLatch = new CountDownLatch(1);
channel.read(
buf, 0, null,
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
resultHolder.set(result);
completionLatch.countDown();
}

@Override
public void failed(Throwable exc, Object attachment) {
exceptionHolder.set(exc);
completionLatch.countDown();
}
});

completionLatch.await();
Throwable exception = exceptionHolder.get();
if (exception != null) {
throw exception;
} else {
assertEquals(10, resultHolder.get());
}

SettableFuture<Integer> future = SettableFuture.create();
channel.read(buf, 0, null, setFuture(future));

assertThat(future.get(10, SECONDS)).isEqualTo(10);
}

private static void checkAsyncWrite(AsynchronousFileChannel asyncChannel) throws Throwable {
ByteBuffer buf = buffer("1234567890");
assertEquals(10, (int) asyncChannel.write(buf, 0).get());

buf.flip();
final AtomicInteger resultHolder = new AtomicInteger(-1);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
final CountDownLatch completionLatch = new CountDownLatch(1);
asyncChannel.write(
buf, 0, null,
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
resultHolder.set(result);
completionLatch.countDown();
}

@Override
public void failed(Throwable exc, Object attachment) {
exceptionHolder.set(exc);
completionLatch.countDown();
}
});

completionLatch.await();
Throwable exception = exceptionHolder.get();
if (exception != null) {
throw exception;
} else {
assertEquals(10, resultHolder.get());
}
SettableFuture<Integer> future = SettableFuture.create();
asyncChannel.write(buf, 0, null, setFuture(future));

assertThat(future.get(10, SECONDS)).isEqualTo(10);
}

private static void checkAsyncLock(AsynchronousFileChannel channel) throws Throwable {
assertNotNull(channel.lock().get());
assertNotNull(channel.lock(0, 10, true).get());

final AtomicReference<FileLock> lockHolder = new AtomicReference<>();
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
final CountDownLatch completionLatch = new CountDownLatch(1);
channel.lock(
0, 10, true, null,
new CompletionHandler<FileLock, Object>() {
@Override
public void completed(FileLock result, Object attachment) {
lockHolder.set(result);
completionLatch.countDown();
}

@Override
public void failed(Throwable exc, Object attachment) {
exceptionHolder.set(exc);
completionLatch.countDown();
}
});

completionLatch.await();
Throwable exception = exceptionHolder.get();
if (exception != null) {
throw exception;
} else {
assertNotNull(lockHolder.get());
SettableFuture<FileLock> future = SettableFuture.create();
channel.lock(0, 10, true, null, setFuture(future));

assertNotNull(future.get(10, SECONDS));
}

/**
* Returns a {@code CompletionHandler} that sets the appropriate result or exception on the given
* {@code future} on completion.
*/
private static <T> CompletionHandler<T, Object> setFuture(final SettableFuture<T> future) {
return new CompletionHandler<T, Object>() {
@Override
public void completed(T result, Object attachment) {
future.set(result);
}

@Override
public void failed(Throwable exc, Object attachment) {
future.setException(exc);
}
};
}

/**
* Assert that the future fails, with the failure caused by {@code ClosedChannelException}.
*/
private static void assertClosed(Future<?> future) throws Throwable {
try {
future.get(10, SECONDS);
fail("ChannelClosedException was not thrown");
} catch (ExecutionException expected) {
assertThat(expected.getCause()).isInstanceOf(ClosedChannelException.class);
}
}

private static void assertClosed(Future<?> future) throws InterruptedException {
/**
* Assert that the future fails, with the failure caused by either
* {@code AsynchronousCloseException} or (rarely) {@code ClosedChannelException}.
*/
private static void assertAsynchronousClose(Future<?> future) throws Throwable {
try {
future.get();
fail();
future.get(10, SECONDS);
fail("no exception was thrown");
} catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof ClosedChannelException);
Throwable t = expected.getCause();
if (!(t instanceof AsynchronousCloseException || t instanceof ClosedChannelException)) {
fail("expected AsynchronousCloseException (or in rare cases ClosedChannelException): "
+ "got " + t);
}
}
}
}

0 comments on commit 3992dc1

Please sign in to comment.