diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java new file mode 100644 index 00000000..26245d05 --- /dev/null +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -0,0 +1,431 @@ +package com.squareup.tape2; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This is a simple and unbounded {@link BlockingQueue} implementation as a wrapper + * around a {@link ObjectQueue}. Thread safety is implemented using a single lock around all + * operations to the backing {@link ObjectQueue}. + * + * @param the element type + */ +public final class BlockingObjectQueue implements BlockingQueue, Closeable { + + private final Lock lock = new ReentrantLock(); + + private final Condition nonEmpty = lock.newCondition(); + + private final ObjectQueue queue; + + public BlockingObjectQueue(ObjectQueue queue) { + this.queue = queue; + } + + public ObjectQueue queue() { + return queue; + } + + /** + * Creates a new {@link BlockingQueue} of type {@code T} backed by the given + * {@link ObjectQueue} of type {@code T}. + * + * @param qf the queue file which should not be shared to other places + * @param conv the converter used to convert from and to byte arrays + * @param the element type + * @return a BlockObjectQueue implementation + */ + public static BlockingObjectQueue create(QueueFile qf, ObjectQueue.Converter conv) { + return new BlockingObjectQueue<>(new FileObjectQueue<>(qf, conv)); + } + + /** + * Creates a new {@link BlockingQueue} of type {@code T} backed by the given + * {@link ObjectQueue} of type {@code T}. + * + * @param qf the queue file which should not be shared to other places + * @return a BlockObjectQueue implementation + */ + public static BlockingObjectQueue create(QueueFile qf) { + return new BlockingObjectQueue<>(qf); + } + + @Override public boolean add(E element) { + lock.lock(); + try { + queue.add(element); + nonEmpty.signal(); + return true; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public boolean offer(E element) { + return add(element); + } + + @Override public void put(E element) { + add(element); + } + + @Override public boolean offer(E element, long timeout, TimeUnit unit) { + return add(element); + } + + @Override public E take() throws InterruptedException { + lock.lock(); + try { + while (queue.isEmpty()) { + nonEmpty.await(); + } + E peek = queue.peek(); + if (peek == null) { + // this won't happen unless the backing queue has been shared. + throw new ConcurrentModificationException(); + } + queue.remove(); + return peek; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long timeoutNanos = unit.toNanos(timeout); + while (queue.isEmpty() && timeoutNanos > 0) { + timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); + } + E peek = queue.peek(); + if (peek == null) { + throw new NoSuchElementException(); + } + queue.remove(); + return peek; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public int remainingCapacity() { + return Integer.MAX_VALUE; // as per BlockingQueue javadoc for unbounded queues + } + + /** + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if + * head matches. + * */ + @Override public boolean remove(Object o) { + lock.lock(); + try { + if (queue.isEmpty()) { + return false; + } + Iterator it = queue.iterator(); + while (it.hasNext()) { + if (Objects.deepEquals(it.next(), o)) { + it.remove(); + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + @Override public boolean contains(Object o) { + lock.lock(); + try { + for (E entry : queue) { + if (Objects.deepEquals(entry, o)) { + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + @Override public int drainTo(Collection c) { + lock.lock(); + try { + return drainTo(c, size()); + } finally { + lock.unlock(); + } + } + + @Override public int drainTo(Collection c, int maxElements) { + if (maxElements == 0) { + return 0; + } + lock.lock(); + try { + Iterator it = queue.iterator(); + int i = 0; + while (it.hasNext() && i < maxElements) { + c.add(it.next()); + it.remove(); + i++; + } + return i; + } finally { + lock.unlock(); + } + } + + @Override public E remove() { + lock.lock(); + try { + if (queue.isEmpty()) { + throw new NoSuchElementException(); + } + E peek = queue.peek(); + queue.remove(); + return peek; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public E poll() { + lock.lock(); + try { + if (queue.isEmpty()) { + return null; + } + E peek = queue.peek(); + queue.remove(); + return peek; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public E element() { + lock.lock(); + try { + if (queue.isEmpty()) { + throw new NoSuchElementException(); + } + return queue.peek(); + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public E peek() { + lock.lock(); + try { + return queue.peek(); + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + /** + * This overload is an addition to the {@link BlockingQueue} interface similar to the + * {@link #poll(long, TimeUnit)} method, a blocking peek operation with a timeout. + * + * @param timeout the timeout + * @param unit the time unit of the timeout + * @return the head of this queue, or {@code null} if this queue is empty + * @see #peek() for more information + * @throws InterruptedException if interrupted while waiting + */ + public E peek(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long timeoutNanos = unit.toNanos(timeout); + while (queue.isEmpty() && timeoutNanos > 0) { + timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); + } + E peek = queue.peek(); + if (peek == null) { + throw new NoSuchElementException(); + } + return peek; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + @Override public boolean isEmpty() { + lock.lock(); + try { + return queue.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override public Iterator iterator() { + return queue.iterator(); + } + + @Override public Object[] toArray() { + lock.lock(); + try { + Object[] out = new Object[queue.size()]; + int i = 0; + for (E e : queue) { + out[i++] = e; + } + return out; + } finally { + lock.unlock(); + } + } + + @Override @SuppressWarnings("unchecked") public T[] toArray(T[] a) { + return (T[]) toArray(); + } + + @Override public boolean containsAll(Collection c) { + lock.lock(); + try { + for (Object e : c) { + if (!contains(e)) { + return false; + } + } + return true; + } finally { + lock.unlock(); + } + } + + @Override public boolean addAll(Collection c) { + if (c.isEmpty()) { + return false; + } + lock.lock(); + try { + for (E e : c) { + queue.add(e); + } + nonEmpty.signal(); + return true; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + /** + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if + * head matches. + * */ + @Override public boolean removeAll(Collection c) { + lock.lock(); + boolean changed = false; + try { + for (Object e : c) { + if (remove(e)) { + changed = true; + } + } + return changed; + } finally { + lock.unlock(); + } + } + + private static boolean contains(Iterable haystack, Object needle) { + for (Object o : haystack) { + if (Objects.deepEquals(o, needle)) { + return true; + } + } + return false; + } + + /** + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if + * head matches. + * */ + @Override public boolean retainAll(Collection c) { + lock.lock(); + if (c.isEmpty()) { + if (queue.isEmpty()) { + return false; + } else { + clear(); + return true; + } + } + try { + boolean changed = false; + Iterator it = queue.iterator(); + while (it.hasNext()) { + if (!contains(c, it.next())) { + it.remove(); + changed = true; + } + } + return changed; + } finally { + lock.unlock(); + } + } + + @Override public void clear() { + lock.lock(); + try { + queue.clear(); + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } finally { + lock.unlock(); + } + } + + @Override public void close() throws IOException { + lock.lock(); + try { + queue.close(); + } finally { + lock.unlock(); + } + } +} diff --git a/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java b/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java index d87c9718..84df0d6d 100644 --- a/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java @@ -19,7 +19,7 @@ final class FileObjectQueue extends ObjectQueue { this.converter = converter; } - @Override public @Nonnull QueueFile file() { + public @Nonnull QueueFile file() { return queueFile; } diff --git a/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java b/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java index 5f983d81..d59c2a1b 100644 --- a/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java @@ -25,10 +25,6 @@ final class InMemoryObjectQueue extends ObjectQueue { entries = new ArrayDeque<>(); } - @Override public @Nullable QueueFile file() { - return null; - } - @Override public void add(T entry) { if (closed) throw new IllegalStateException("closed"); modCount++; diff --git a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java index 560a8dd7..eb284864 100644 --- a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import javax.annotation.Nullable; /** A queue of objects. */ @@ -25,9 +26,6 @@ public static ObjectQueue createInMemory() { return new InMemoryObjectQueue<>(); } - /** The underlying {@link QueueFile} backing this queue, or null if it's only in memory. */ - public abstract @Nullable QueueFile file(); - /** Returns the number of entries in the queue. */ public abstract int size(); @@ -65,7 +63,11 @@ public List asList() throws IOException { return peek(size()); } - /** Removes the head of the queue. */ + /** + * Removes the eldest element (the head) of the queue. + * + * @throws NoSuchElementException if the queue is empty + */ public void remove() throws IOException { remove(1); } diff --git a/tape/src/main/java/com/squareup/tape2/QueueFile.java b/tape/src/main/java/com/squareup/tape2/QueueFile.java index c1cae7b6..efb17c9f 100644 --- a/tape/src/main/java/com/squareup/tape2/QueueFile.java +++ b/tape/src/main/java/com/squareup/tape2/QueueFile.java @@ -15,7 +15,6 @@ */ package com.squareup.tape2; -import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -52,7 +51,7 @@ * * @author Bob Lee (bob@squareup.com) */ -public final class QueueFile implements Closeable, Iterable { +public final class QueueFile extends ObjectQueue { /** Leading bit set to 1 indicating a versioned header and the version of 1. */ private static final int VERSIONED_HEADER = 0x80000001; @@ -358,7 +357,7 @@ private void ringErase(long position, long length) throws IOException { * * @param data to copy bytes from */ - public void add(byte[] data) throws IOException { + @Override public void add(byte[] data) throws IOException { add(data, 0, data.length); } @@ -424,11 +423,6 @@ private long remainingBytes() { return fileLength - usedBytes(); } - /** Returns true if this queue contains no entries. */ - public boolean isEmpty() { - return elementCount == 0; - } - /** * If necessary, expands the file to accommodate an additional element of the given length. * @@ -488,7 +482,7 @@ private void setLength(long newLength) throws IOException { } /** Reads the eldest element. Returns null if the queue is empty. */ - public @Nullable byte[] peek() throws IOException { + @Override public @Nullable byte[] peek() throws IOException { if (closed) throw new IllegalStateException("closed"); if (isEmpty()) return null; int length = first.length; @@ -582,25 +576,16 @@ private void checkForComodification() { } /** Returns the number of elements in this queue. */ - public int size() { + @Override public int size() { return elementCount; } - /** - * Removes the eldest element. - * - * @throws NoSuchElementException if the queue is empty - */ - public void remove() throws IOException { - remove(1); - } - /** * Removes the eldest {@code n} elements. * * @throws NoSuchElementException if the queue is empty */ - public void remove(int n) throws IOException { + @Override public void remove(int n) throws IOException { if (n < 0) { throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements."); } @@ -644,7 +629,7 @@ public void remove(int n) throws IOException { } /** Clears this queue. Truncates the file to the initial size. */ - public void clear() throws IOException { + @Override public void clear() throws IOException { if (closed) throw new IllegalStateException("closed"); // Commit the header. diff --git a/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java new file mode 100644 index 00000000..0abaed39 --- /dev/null +++ b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java @@ -0,0 +1,486 @@ +package com.squareup.tape2; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class BlockingObjectQueueTest { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + private File file; + + @Before public void setUp() throws Exception { + File parent = folder.getRoot(); + file = new File(parent, "blocking-queue-file"); + } + + private QueueFile newQueueFile() throws IOException { + return new QueueFile.Builder(file).build(); + } + + private BlockingObjectQueue newQueue() throws IOException { + return BlockingObjectQueue.create(newQueueFile()); + } + + @Test public void add() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + q.add(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void offer() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + q.offer(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void put() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void offer1() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + q.offer(new byte[]{1, 2}, 1, TimeUnit.SECONDS); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void takeEmpty() throws IOException, InterruptedException { + final BlockingObjectQueue q = newQueue(); + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + q.take(); + } catch (InterruptedException ignored) { + return; + } + Assert.fail("The take operation should have been interrupted!"); + } + }); + t.start(); + Thread.sleep(1000); + t.interrupt(); + } + + @Test public void takeNonEmpty() throws IOException, InterruptedException { + BlockingObjectQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.take()); + } + + @Test public void pollEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + assertNull(q.poll()); + } + + @Test public void pollNonEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.poll()); + } + + @Test public void remainingCapacity() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); + q.add(new byte[]{}); + assertEquals(1, q.size()); + assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); + } + + @Test public void remove() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + assertArrayEquals(new byte[]{1}, q.remove()); + assertEquals(1, q.size()); + assertArrayEquals(new byte[]{2}, q.remove()); + assertEquals(0, q.size()); + + assertFalse(q.remove(null)); + } + + @Test(expected = NoSuchElementException.class) + public void removeEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + q.remove(); + } + + @Test public void contains() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + + assertTrue(q.contains(new byte[]{1})); + assertTrue(q.contains(new byte[]{2})); + assertFalse(q.contains(new byte[]{3})); + } + + @Test public void drainTo() throws IOException { + BlockingObjectQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + List out = new ArrayList<>(); + assertEquals(ref.size(), q.drainTo(out)); + assertEquals(ref.size(), out.size()); + assertTrue(q.isEmpty()); + assertArrayEquals(ref.toArray(), out.toArray()); + } + + @Test public void drainTo1() throws IOException { + BlockingObjectQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}, new byte[]{3}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + int drain = ref.size() - 1; + List out = new ArrayList<>(); + assertEquals(drain, q.drainTo(out, drain)); + assertEquals(drain, out.size()); + assertEquals(1, q.size()); + assertArrayEquals(ref.subList(0, drain).toArray(), out.toArray()); + + assertEquals(0, q.drainTo(out, 0)); + } + + @Test public void drainTo1TooSmall() throws IOException { + BlockingObjectQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + List out = new ArrayList<>(); + assertEquals(ref.size(), q.drainTo(out, Integer.MAX_VALUE)); + assertEquals(ref.size(), out.size()); + assertTrue(q.isEmpty()); + assertArrayEquals(ref.toArray(), out.toArray()); + } + + @Test public void remove1() throws IOException { + byte[] o1 = {1}; + byte[] o2 = {2}; + + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(o1)); + assertEquals(1, q.size()); + assertTrue(q.add(o2)); + assertEquals(2, q.size()); + + assertFalse(q.remove(new byte[]{3})); + assertEquals(2, q.size()); + assertTrue(q.remove(o1)); + assertEquals(1, q.size()); + assertTrue(q.remove(o2)); + assertEquals(0, q.size()); + } + + @Test(expected = UnsupportedOperationException.class) + public void remove1NotHead() throws IOException { + byte[] o1 = {1}; + byte[] o2 = {2}; + + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(o1)); + assertEquals(1, q.size()); + assertTrue(q.add(o2)); + assertEquals(2, q.size()); + + assertTrue(q.remove(o2)); + } + + @Test(expected = NoSuchElementException.class) + public void poll1Empty() throws IOException, InterruptedException { + BlockingObjectQueue q = newQueue(); + q.poll(1, TimeUnit.SECONDS); + } + + @Test(timeout = 1000) + public void poll1EmptyAndAdd() throws IOException, InterruptedException { + final BlockingObjectQueue q = newQueue(); + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + q.add(new byte[]{1}); + } + }); + t.start(); + assertArrayEquals(new byte[] {1}, q.poll(1, TimeUnit.MINUTES)); + t.join(); + } + + @Test public void poll1NonEmpty() throws IOException, InterruptedException { + BlockingObjectQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.poll(1, TimeUnit.SECONDS)); + } + + @Test(expected = NoSuchElementException.class) + public void elementEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + q.element(); + } + + @Test public void elementNonEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[]{1}, q.element()); + } + + @Test public void peekEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + assertNull(q.peek()); + } + + @Test public void peekNonEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[]{1}, q.peek()); + } + + @Test(expected = NoSuchElementException.class) + public void peek1Empty() throws IOException, InterruptedException { + BlockingObjectQueue q = newQueue(); + q.peek(1, TimeUnit.SECONDS); + } + + @Test public void peek1NonEmpty() throws IOException, InterruptedException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[]{1}, q.peek(1, TimeUnit.SECONDS)); + } + + @Test public void size() throws IOException { + BlockingObjectQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + } + + @Test public void isEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.isEmpty()); + assertTrue(q.add(new byte[]{1})); + assertFalse(q.isEmpty()); + assertNotNull(q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void iterator() throws IOException { + byte[][] ref = {{1}, {2}, {3}}; + BlockingObjectQueue q = newQueue(); + q.addAll(Arrays.asList(ref)); + + int i = 0; + Iterator it = q.iterator(); + while (it.hasNext()) { + assertArrayEquals(ref[i++], it.next()); + } + + assertEquals(ref.length, i); + } + + @Test public void toArray() throws IOException { + byte[][] bytes = {{1}, {2}}; + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + + assertArrayEquals(bytes, q.toArray()); + } + + @Test public void toArray1() throws IOException { + byte[][] bytes = {{1}, {2}}; + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + + assertArrayEquals(bytes, q.toArray(new byte[0][])); + } + + @Test public void containsAll() throws IOException { + List bytes = Arrays.asList(new byte[]{1}, new byte[]{2}); + BlockingObjectQueue q = newQueue(); + assertFalse(q.containsAll(bytes)); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertFalse(q.containsAll(bytes)); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + assertTrue(q.containsAll(bytes)); + assertTrue(q.add(new byte[]{3})); + assertEquals(3, q.size()); + assertTrue(q.containsAll(bytes)); + } + + @Test public void addAll() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.addAll(Arrays.asList(new byte[]{1}, new byte[]{2}))); + assertArrayEquals(new byte[]{1}, q.remove()); + assertArrayEquals(new byte[]{2}, q.remove()); + assertTrue(q.isEmpty()); + assertFalse(q.addAll(Collections.emptyList())); + } + + @Test public void removeAll() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + assertTrue(q.add(new byte[]{3})); + + List remove = Arrays.asList(new byte[]{1}, new byte[]{2}); + assertTrue(q.removeAll(remove)); + assertArrayEquals(new byte[]{3}, q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void retainAll() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + assertTrue(q.add(new byte[]{3})); + + List retain = Arrays.asList(new byte[]{2}, new byte[]{3}); + assertTrue(q.retainAll(retain)); + assertArrayEquals(new byte[]{2}, q.remove()); + assertArrayEquals(new byte[]{3}, q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void retainAllListEmpty() throws IOException { + List retainNone = Collections.emptyList(); + + BlockingObjectQueue empty = newQueue(); + assertFalse(empty.retainAll(retainNone)); + + BlockingObjectQueue nonEmpty = newQueue(); + assertTrue(nonEmpty.add(new byte[]{1})); + assertTrue(nonEmpty.add(new byte[]{2})); + assertTrue(nonEmpty.add(new byte[]{3})); + assertTrue(nonEmpty.retainAll(retainNone)); + assertTrue(nonEmpty.isEmpty()); + } + + @Test public void clear() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + q.clear(); + assertEquals(0, q.size()); + } + + @Test public void create() throws IOException { + QueueFile qf = newQueueFile(); + BlockingObjectQueue q = BlockingObjectQueue.create(qf, new NoOpConverter()); + assertTrue(q.queue() instanceof FileObjectQueue); + } + + @Test public void createByteArray() throws IOException { + QueueFile qf = newQueueFile(); + BlockingObjectQueue q = BlockingObjectQueue.create(qf); + assertTrue(q.queue() instanceof QueueFile); + } + + @Test public void close() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.queue() instanceof QueueFile); + QueueFile qf = (QueueFile) q.queue(); + q.close(); + assertTrue(qf.closed); + } + + @Test(expected = IOException.class) + public void ioException() { + BlockingObjectQueue q = new BlockingObjectQueue<>(new BrokenQueueFile()); + q.add(null); + q.remove(); + } + + private static final class BrokenQueueFile extends ObjectQueue { + public int size = 0; + + @Override public int size() { + return size; + } + + @Override public void add(Object entry) throws IOException { + throw new IOException("test"); + } + + @Nullable @Override public Object peek() throws IOException { + throw new IOException("test"); + } + + @Override public void remove(int n) throws IOException { + throw new IOException("test"); + } + + @Override public void close() throws IOException { + throw new IOException("test"); + } + + @Override public Iterator iterator() { + return null; + } + } + + private static final class NoOpConverter implements ObjectQueue.Converter { + @Override public Object from(byte[] source) { + return null; + } + + @Override public void toStream(Object value, OutputStream sink) { + + } + } +}