From db7682187d57dec51a7b736adfcd3f2c396c33b9 Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Thu, 25 Oct 2018 01:37:54 +0200 Subject: [PATCH 1/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile The resulting BlockingFileQueue is thread-safe and unbounded. Thread-safety is implemented with a single lock around all operations against the backing queue. --- .../com/squareup/tape2/BlockingFileQueue.java | 413 ++++++++++++++++++ .../squareup/tape2/BlockingFileQueueTest.java | 367 ++++++++++++++++ 2 files changed, 780 insertions(+) create mode 100644 tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java create mode 100644 tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java diff --git a/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java new file mode 100644 index 00000000..745fe1bf --- /dev/null +++ b/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java @@ -0,0 +1,413 @@ +package com.squareup.tape2; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This is a simple and unbounded @{@link BlockingQueue} implementation as a wrapper around a @{@link QueueFile}. + * Thread safety is implemented using a single lock around all operations to the backing @{@link QueueFile}. + */ +public class BlockingFileQueue implements BlockingQueue { + + private final Lock lock = new ReentrantLock(); + + private final Condition nonEmpty = lock.newCondition(); + + private final QueueFile queue; + + /** + * Creates a new @{@link BlockingQueue} of type {@code byte[]} backed by the given @{@link QueueFile}. + * + * @param queue the queue file which should not be shared to other places + * */ + public BlockingFileQueue(QueueFile queue) { + this.queue = queue; + } + + @Override public boolean add(byte[] bytes) { + lock.lock(); + try { + queue.add(bytes); + nonEmpty.signal(); + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public boolean offer(byte[] bytes) { + return add(bytes); + } + + @Override public void put(byte[] bytes) { + add(bytes); + } + + @Override public boolean offer(byte[] bytes, long timeout, TimeUnit unit) { + return add(bytes); + } + + @Override public byte[] take() throws InterruptedException { + lock.lock(); + try { + while (queue.isEmpty()) { + nonEmpty.await(); + } + byte[] peek = queue.peek(); + if (peek == null) { + throw new IllegalStateException("Queue empty!"); + } + queue.remove(); + return peek; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long timeoutNanos = unit.toNanos(timeout); + while (queue.isEmpty() && timeoutNanos > 0) { + timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); + } + byte[] peek = queue.peek(); + if (peek == null) { + throw new NoSuchElementException(); + } + queue.remove(); + return peek; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public int remainingCapacity() { + return Integer.MAX_VALUE; // as per BlockingQueue javadoc for unbounded queues + } + + /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + @Override public boolean remove(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { + return false; + } + byte[] remove = (byte[])o; + lock.lock(); + try { + if (queue.isEmpty()) { + return false; + } + Iterator it = queue.iterator(); + while (it.hasNext()) { + if (Arrays.equals(it.next(), remove)) { + it.remove(); + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + @Override public boolean contains(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { + return false; + } + byte[] check = (byte[])o; + lock.lock(); + try { + for (byte[] entry : queue) { + if (Arrays.equals(entry, check)) { + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + @Override public int drainTo(Collection c) { + lock.lock(); + try { + int size = queue.size(); + Iterator it = queue.iterator(); + while (it.hasNext()) { + c.add(it.next()); + it.remove(); + } + return size; + } finally { + lock.unlock(); + } + } + + @Override public int drainTo(Collection c, int maxElements) { + if (maxElements == 0) { + return 0; + } + lock.lock(); + try { + Iterator it = queue.iterator(); + int i = 0; + while (it.hasNext() && i < maxElements) { + c.add(it.next()); + it.remove(); + i++; + } + return i; + } finally { + lock.unlock(); + } + } + + @Override public byte[] remove() { + lock.lock(); + try { + if (queue.isEmpty()) { + throw new NoSuchElementException(); + } + byte[] peek = queue.peek(); + queue.remove(); + return peek; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public byte[] poll() { + lock.lock(); + try { + if (queue.isEmpty()) { + return null; + } + byte[] peek = queue.peek(); + queue.remove(); + return peek; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public byte[] element() { + lock.lock(); + try { + if (queue.isEmpty()) { + throw new NoSuchElementException(); + } + return queue.peek(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public byte[] peek() { + lock.lock(); + try { + if (queue.isEmpty()) { + return null; + } + return queue.peek(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + /** + * This overload is an addition to the @{@link BlockingQueue} interface similar to the {@link #poll(long, TimeUnit)} + * method, a blocking peek operation with a timeout. + * + * @param timeout the timeout + * @param unit the time unit of the timeout + * @return the head of this queue, or {@code null} if this queue is empty + * @see #peek() for more information + * @throws InterruptedException if interrupted while waiting + */ + public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long timeoutNanos = unit.toNanos(timeout); + while (queue.isEmpty() && timeoutNanos > 0) { + timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); + } + byte[] peek = queue.peek(); + if (peek == null) { + throw new NoSuchElementException(); + } + return peek; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + @Override public int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + @Override public boolean isEmpty() { + lock.lock(); + try { + return queue.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override public Iterator iterator() { + lock.lock(); + try { + return queue.iterator(); + } finally { + lock.unlock(); + } + } + + @Override public Object[] toArray() { + lock.lock(); + try { + Object[] out = new Object[queue.size()]; + int i = 0; + for (byte[] e : queue) { + out[i++] = e; + } + return out; + } finally { + lock.unlock(); + } + } + + @Override @SuppressWarnings("unchecked") public T[] toArray(T[] a) { + return (T[]) toArray(); + } + + @Override public boolean containsAll(Collection c) { + lock.lock(); + try { + for (Object e : c) { + if (!contains(e)) { + return false; + } + } + return true; + } finally { + lock.unlock(); + } + } + + @Override public boolean addAll(Collection c) { + if (c.isEmpty()) { + return false; + } + lock.lock(); + try { + for (byte[] e : c) { + queue.add(e); + } + nonEmpty.signal(); + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + @Override public boolean removeAll(Collection c) { + lock.lock(); + boolean changed = false; + try { + for (Object e : c) { + if (remove(e)) { + changed = true; + } + } + return changed; + } finally { + lock.unlock(); + } + } + + private static boolean arrayContained(Collection haystack, byte[] needle) { + for (Object o : haystack) { + byte[] byteArray = (byte[])o; + if (Arrays.equals(byteArray, needle)) { + return true; + } + } + return false; + } + + /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + @Override public boolean retainAll(Collection c) { + lock.lock(); + if (c.isEmpty()) { + if (queue.isEmpty()) { + return false; + } else { + clear(); + return true; + } + } + try { + boolean changed = false; + Iterator it = queue.iterator(); + while (it.hasNext()) { + if (!arrayContained(c, it.next())) { + it.remove(); + changed = true; + } + } + return changed; + } finally { + lock.unlock(); + } + } + + @Override public void clear() { + lock.lock(); + try { + queue.clear(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } +} diff --git a/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java new file mode 100644 index 00000000..2c7f04d8 --- /dev/null +++ b/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java @@ -0,0 +1,367 @@ +package com.squareup.tape2; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class BlockingFileQueueTest { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + private File file; + + @Before public void setUp() throws Exception { + File parent = folder.getRoot(); + file = new File(parent, "blocking-queue-file"); + } + + private QueueFile newQueueFile() throws IOException { + return new QueueFile.Builder(file).build(); + } + + private BlockingFileQueue newQueue() throws IOException { + return new BlockingFileQueue(newQueueFile()); + } + + @Test public void add() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + q.add(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void offer() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + q.offer(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void put() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void offer1() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + q.offer(new byte[]{1, 2}, 1, TimeUnit.SECONDS); + assertArrayEquals(new byte[]{1, 2}, q.peek()); + assertEquals(1, q.size()); + } + + @Test public void takeEmpty() throws IOException, InterruptedException { + final BlockingFileQueue q = newQueue(); + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + q.take(); + } catch (InterruptedException ignored) { + return; + } + Assert.fail("The take operation should have been interrupted and this fail should not be called!"); + } + }); + t.start(); + Thread.sleep(1000); + t.interrupt(); + } + + @Test public void takeNonEmpty() throws IOException, InterruptedException { + BlockingFileQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.take()); + } + + @Test public void pollEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + assertNull(q.poll()); + } + + @Test public void pollNonEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.poll()); + } + + @Test public void remainingCapacity() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); + q.add(new byte[]{}); + assertEquals(1, q.size()); + assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); + } + + @Test public void remove() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + assertArrayEquals(new byte[]{1}, q.remove()); + assertEquals(1, q.size()); + assertArrayEquals(new byte[]{2}, q.remove()); + assertEquals(0, q.size()); + } + + @Test public void contains() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + assertTrue(q.add(new byte[]{2})); + assertEquals(2, q.size()); + + assertTrue(q.contains(new byte[] {1})); + assertTrue(q.contains(new byte[] {2})); + assertFalse(q.contains(new byte[] {3})); + } + + @Test public void drainTo() throws IOException { + BlockingFileQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + List out = new ArrayList<>(); + assertEquals(ref.size(), q.drainTo(out)); + assertEquals(ref.size(), out.size()); + assertTrue(q.isEmpty()); + assertArrayEquals(ref.toArray(), out.toArray()); + } + + @Test public void drainTo1() throws IOException { + BlockingFileQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}, new byte[]{3}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + int drain = ref.size() - 1; + List out = new ArrayList<>(); + assertEquals(drain, q.drainTo(out, drain)); + assertEquals(drain, out.size()); + assertEquals(1, q.size()); + assertArrayEquals(ref.subList(0, drain).toArray(), out.toArray()); + } + + @Test public void drainTo1TooSmall() throws IOException { + BlockingFileQueue q = newQueue(); + List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); + assertTrue(q.addAll(ref)); + assertEquals(ref.size(), q.size()); + + List out = new ArrayList<>(); + assertEquals(ref.size(), q.drainTo(out, Integer.MAX_VALUE)); + assertEquals(ref.size(), out.size()); + assertTrue(q.isEmpty()); + assertArrayEquals(ref.toArray(), out.toArray()); + } + + @Test public void remove1() throws IOException { + byte[] o1 = {1}; + byte[] o2 = {2}; + + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(o1)); + assertEquals(1, q.size()); + assertTrue(q.add(o2)); + assertEquals(2, q.size()); + + assertFalse(q.remove(new byte[] {3})); + assertEquals(2, q.size()); + assertTrue(q.remove(o1)); + assertEquals(1, q.size()); + assertTrue(q.remove(o2)); + assertEquals(0, q.size()); + } + + @Test(expected = UnsupportedOperationException.class) + public void remove1NotHead() throws IOException { + byte[] o1 = {1}; + byte[] o2 = {2}; + + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(o1)); + assertEquals(1, q.size()); + assertTrue(q.add(o2)); + assertEquals(2, q.size()); + + assertTrue(q.remove(o2)); + } + + @Test(expected = NoSuchElementException.class) + public void poll1Empty() throws IOException, InterruptedException { + BlockingFileQueue q = newQueue(); + q.poll(1, TimeUnit.SECONDS); + } + + @Test public void poll1NonEmpty() throws IOException, InterruptedException { + BlockingFileQueue q = newQueue(); + q.put(new byte[]{1, 2}); + assertArrayEquals(new byte[]{1, 2}, q.poll(1, TimeUnit.SECONDS)); + } + + @Test(expected = NoSuchElementException.class) + public void elementEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + q.element(); + } + + @Test public void elementNonEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[] {1}, q.element()); + } + + @Test public void peekEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + assertNull(q.peek()); + } + + @Test public void peekNonEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[] {1}, q.peek()); + } + + @Test(expected = NoSuchElementException.class) + public void peek1Empty() throws IOException, InterruptedException { + BlockingFileQueue q = newQueue(); + q.peek(1, TimeUnit.SECONDS); + } + + @Test public void peek1NonEmpty() throws IOException, InterruptedException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertEquals(1, q.size()); + assertArrayEquals(new byte[] {1}, q.peek(1, TimeUnit.SECONDS)); + } + + @Test public void size() throws IOException { + BlockingFileQueue q = newQueue(); + assertEquals(0, q.size()); + assertTrue(q.add(new byte[] {1})); + assertEquals(1, q.size());; + assertTrue(q.add(new byte[] {2})); + assertEquals(2, q.size()); + } + + @Test public void isEmpty() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.isEmpty()); + assertTrue(q.add(new byte[] {1})); + assertFalse(q.isEmpty()); + assertNotNull(q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void iterator() throws IOException { + byte[][] ref = {{1}, {2}, {3}}; + BlockingFileQueue q = newQueue(); + for (byte[] b : ref) { + q.add(b); + } + + int i = 0; + Iterator it = q.iterator(); + while (it.hasNext()) { + assertArrayEquals(ref[i++], it.next()); + } + + assertEquals(ref.length, i); + } + + @Test public void toArray() throws IOException { + byte[][] bytes = {{1}, {2}}; + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[] {2})); + + assertArrayEquals(bytes, q.toArray()); + } + + @Test public void toArray1() throws IOException { + byte[][] bytes = {{1}, {2}}; + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[] {2})); + + assertArrayEquals(bytes, q.toArray(new byte[0][])); + } + + @Test public void containsAll() throws IOException { + List bytes = Arrays.asList(new byte[]{1}, new byte[]{2}); + BlockingFileQueue q = newQueue(); + assertFalse(q.containsAll(bytes)); + assertTrue(q.add(new byte[] {1})); + assertEquals(1, q.size()); + assertFalse(q.containsAll(bytes)); + assertTrue(q.add(new byte[] {2})); + assertEquals(2, q.size()); + assertTrue(q.containsAll(bytes)); + assertTrue(q.add(new byte[] {3})); + assertEquals(3, q.size()); + assertTrue(q.containsAll(bytes)); + } + + @Test public void addAll() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.addAll(Arrays.asList(new byte[] {1}, new byte[] {2}))); + assertArrayEquals(new byte[] {1}, q.remove()); + assertArrayEquals(new byte[] {2}, q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void removeAll() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[] {2})); + assertTrue(q.add(new byte[] {3})); + + List remove = Arrays.asList(new byte[] {1}, new byte[] {2}); + assertTrue(q.removeAll(remove)); + assertArrayEquals(new byte[] {3}, q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void retainAll() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[] {2})); + assertTrue(q.add(new byte[] {3})); + + List retain = Arrays.asList(new byte[] {2}, new byte[] {3}); + assertTrue(q.retainAll(retain)); + assertArrayEquals(new byte[] {2}, q.remove()); + assertArrayEquals(new byte[] {3}, q.remove()); + assertTrue(q.isEmpty()); + } + + @Test public void clear() throws IOException { + BlockingFileQueue q = newQueue(); + assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[] {2})); + assertEquals(2, q.size()); + q.clear(); + assertEquals(0, q.size()); + } +} \ No newline at end of file From c799abe51cf982469c7a4c584fab7ee24977708c Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Thu, 25 Oct 2018 02:40:07 +0200 Subject: [PATCH 2/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile Switched to an ObjectQueue as the backing store and delegate closable. --- ...ileQueue.java => BlockingObjectQueue.java} | 150 +++++++++------- .../squareup/tape2/ByteArrayConverter.java | 20 +++ ...Test.java => BlockingObjectQueueTest.java} | 170 +++++++++--------- 3 files changed, 196 insertions(+), 144 deletions(-) rename tape/src/main/java/com/squareup/tape2/{BlockingFileQueue.java => BlockingObjectQueue.java} (64%) create mode 100644 tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java rename tape/src/test/java/com/squareup/tape2/{BlockingFileQueueTest.java => BlockingObjectQueueTest.java} (66%) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java similarity index 64% rename from tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java rename to tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index 745fe1bf..b47ce7a1 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -1,10 +1,11 @@ package com.squareup.tape2; +import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -12,30 +13,52 @@ import java.util.concurrent.locks.ReentrantLock; /** - * This is a simple and unbounded @{@link BlockingQueue} implementation as a wrapper around a @{@link QueueFile}. - * Thread safety is implemented using a single lock around all operations to the backing @{@link QueueFile}. + * This is a simple and unbounded {@link BlockingQueue} implementation as a wrapper + * around a {@link QueueFile}. Thread safety is implemented using a single lock around all + * operations to the backing {@link QueueFile}. + * + * @param the element type */ -public class BlockingFileQueue implements BlockingQueue { +public class BlockingObjectQueue implements BlockingQueue, Closeable { private final Lock lock = new ReentrantLock(); private final Condition nonEmpty = lock.newCondition(); - private final QueueFile queue; + private final ObjectQueue queue; + + private BlockingObjectQueue(ObjectQueue queue) { + this.queue = queue; + } /** - * Creates a new @{@link BlockingQueue} of type {@code byte[]} backed by the given @{@link QueueFile}. + * Creates a new {@link BlockingQueue} of type {@code T} backed by the given + * {@link ObjectQueue} of type {@code T}. * - * @param queue the queue file which should not be shared to other places - * */ - public BlockingFileQueue(QueueFile queue) { - this.queue = queue; + * @param qf the queue file which should not be shared to other places + * @param conv the converter used to convert from and to byte arrays + * @param the element type + * @return a BlockObjectQueue implementation + */ + public static BlockingObjectQueue create(QueueFile qf, ObjectQueue.Converter conv) { + return new BlockingObjectQueue<>(new FileObjectQueue<>(qf, conv)); } - @Override public boolean add(byte[] bytes) { + /** + * Creates a new {@link BlockingQueue} of type {@code T} backed by the given + * {@link ObjectQueue} of type {@code T}. + * + * @param qf the queue file which should not be shared to other places + * @return a BlockObjectQueue implementation + */ + public static BlockingObjectQueue create(QueueFile qf) { + return create(qf, ByteArrayConverter.INSTANCE); + } + + @Override public boolean add(E element) { lock.lock(); try { - queue.add(bytes); + queue.add(element); nonEmpty.signal(); return true; } catch (IOException e) { @@ -45,25 +68,25 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public boolean offer(byte[] bytes) { - return add(bytes); + @Override public boolean offer(E element) { + return add(element); } - @Override public void put(byte[] bytes) { - add(bytes); + @Override public void put(E element) { + add(element); } - @Override public boolean offer(byte[] bytes, long timeout, TimeUnit unit) { - return add(bytes); + @Override public boolean offer(E element, long timeout, TimeUnit unit) { + return add(element); } - @Override public byte[] take() throws InterruptedException { + @Override public E take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { nonEmpty.await(); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new IllegalStateException("Queue empty!"); } @@ -76,14 +99,14 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException { + @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { long timeoutNanos = unit.toNanos(timeout); while (queue.isEmpty() && timeoutNanos > 0) { timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new NoSuchElementException(); } @@ -100,23 +123,22 @@ public BlockingFileQueue(QueueFile queue) { return Integer.MAX_VALUE; // as per BlockingQueue javadoc for unbounded queues } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean remove(Object o) { if (o == null) { return false; } - if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { - return false; - } - byte[] remove = (byte[])o; lock.lock(); try { if (queue.isEmpty()) { return false; } - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { - if (Arrays.equals(it.next(), remove)) { + if (Objects.deepEquals(it.next(), o)) { it.remove(); return true; } @@ -128,17 +150,10 @@ public BlockingFileQueue(QueueFile queue) { } @Override public boolean contains(Object o) { - if (o == null) { - return false; - } - if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { - return false; - } - byte[] check = (byte[])o; lock.lock(); try { - for (byte[] entry : queue) { - if (Arrays.equals(entry, check)) { + for (E entry : queue) { + if (Objects.deepEquals(entry, o)) { return true; } } @@ -148,11 +163,11 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public int drainTo(Collection c) { + @Override public int drainTo(Collection c) { lock.lock(); try { int size = queue.size(); - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { c.add(it.next()); it.remove(); @@ -163,13 +178,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public int drainTo(Collection c, int maxElements) { + @Override public int drainTo(Collection c, int maxElements) { if (maxElements == 0) { return 0; } lock.lock(); try { - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); int i = 0; while (it.hasNext() && i < maxElements) { c.add(it.next()); @@ -182,13 +197,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] remove() { + @Override public E remove() { lock.lock(); try { if (queue.isEmpty()) { throw new NoSuchElementException(); } - byte[] peek = queue.peek(); + E peek = queue.peek(); queue.remove(); return peek; } catch (IOException e) { @@ -198,13 +213,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] poll() { + @Override public E poll() { lock.lock(); try { if (queue.isEmpty()) { return null; } - byte[] peek = queue.peek(); + E peek = queue.peek(); queue.remove(); return peek; } catch (IOException e) { @@ -214,7 +229,7 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] element() { + @Override public E element() { lock.lock(); try { if (queue.isEmpty()) { @@ -228,7 +243,7 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] peek() { + @Override public E peek() { lock.lock(); try { if (queue.isEmpty()) { @@ -243,8 +258,8 @@ public BlockingFileQueue(QueueFile queue) { } /** - * This overload is an addition to the @{@link BlockingQueue} interface similar to the {@link #poll(long, TimeUnit)} - * method, a blocking peek operation with a timeout. + * This overload is an addition to the {@link BlockingQueue} interface similar to the + * {@link #poll(long, TimeUnit)} method, a blocking peek operation with a timeout. * * @param timeout the timeout * @param unit the time unit of the timeout @@ -252,14 +267,14 @@ public BlockingFileQueue(QueueFile queue) { * @see #peek() for more information * @throws InterruptedException if interrupted while waiting */ - public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { + public E peek(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { long timeoutNanos = unit.toNanos(timeout); while (queue.isEmpty() && timeoutNanos > 0) { timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new NoSuchElementException(); } @@ -289,7 +304,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - @Override public Iterator iterator() { + @Override public Iterator iterator() { lock.lock(); try { return queue.iterator(); @@ -303,7 +318,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { try { Object[] out = new Object[queue.size()]; int i = 0; - for (byte[] e : queue) { + for (E e : queue) { out[i++] = e; } return out; @@ -330,13 +345,13 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - @Override public boolean addAll(Collection c) { + @Override public boolean addAll(Collection c) { if (c.isEmpty()) { return false; } lock.lock(); try { - for (byte[] e : c) { + for (E e : c) { queue.add(e); } nonEmpty.signal(); @@ -348,7 +363,10 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean removeAll(Collection c) { lock.lock(); boolean changed = false; @@ -364,17 +382,19 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - private static boolean arrayContained(Collection haystack, byte[] needle) { + private static boolean contains(Iterable haystack, Object needle) { for (Object o : haystack) { - byte[] byteArray = (byte[])o; - if (Arrays.equals(byteArray, needle)) { + if (Objects.deepEquals(o, needle)) { return true; } } return false; } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean retainAll(Collection c) { lock.lock(); if (c.isEmpty()) { @@ -387,9 +407,9 @@ private static boolean arrayContained(Collection haystack, byte[] needle) { } try { boolean changed = false; - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { - if (!arrayContained(c, it.next())) { + if (!contains(c, it.next())) { it.remove(); changed = true; } @@ -410,4 +430,8 @@ private static boolean arrayContained(Collection haystack, byte[] needle) { lock.unlock(); } } + + @Override public void close() throws IOException { + queue.close(); + } } diff --git a/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java b/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java new file mode 100644 index 00000000..c568a9af --- /dev/null +++ b/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java @@ -0,0 +1,20 @@ +package com.squareup.tape2; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * This is a simple "NO-OP" converter for byte[] queues which don't actually need a conversion. + */ +public class ByteArrayConverter implements ObjectQueue.Converter { + + public static final ByteArrayConverter INSTANCE = new ByteArrayConverter(); + + @Override public byte[] from(byte[] source) { + return source; + } + + @Override public void toStream(byte[] value, OutputStream sink) throws IOException { + sink.write(value); + } +} diff --git a/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java similarity index 66% rename from tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java rename to tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java index 2c7f04d8..32993be8 100644 --- a/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java +++ b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java @@ -8,12 +8,21 @@ import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; -public class BlockingFileQueueTest { +public class BlockingObjectQueueTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); private File file; @@ -27,12 +36,12 @@ private QueueFile newQueueFile() throws IOException { return new QueueFile.Builder(file).build(); } - private BlockingFileQueue newQueue() throws IOException { - return new BlockingFileQueue(newQueueFile()); + private BlockingObjectQueue newQueue() throws IOException { + return BlockingObjectQueue.create(newQueueFile()); } @Test public void add() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.add(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -40,7 +49,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void offer() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.offer(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -48,7 +57,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void put() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -56,7 +65,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void offer1() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.offer(new byte[]{1, 2}, 1, TimeUnit.SECONDS); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -64,7 +73,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void takeEmpty() throws IOException, InterruptedException { - final BlockingFileQueue q = newQueue(); + final BlockingObjectQueue q = newQueue(); Thread t = new Thread(new Runnable() { @Override public void run() { try { @@ -72,7 +81,7 @@ private BlockingFileQueue newQueue() throws IOException { } catch (InterruptedException ignored) { return; } - Assert.fail("The take operation should have been interrupted and this fail should not be called!"); + Assert.fail("The take operation should have been interrupted!"); } }); t.start(); @@ -81,24 +90,24 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void takeNonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.take()); } @Test public void pollEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertNull(q.poll()); } @Test public void pollNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.poll()); } @Test public void remainingCapacity() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); q.add(new byte[]{}); @@ -107,7 +116,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void remove() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); @@ -120,20 +129,20 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void contains() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); assertTrue(q.add(new byte[]{2})); assertEquals(2, q.size()); - assertTrue(q.contains(new byte[] {1})); - assertTrue(q.contains(new byte[] {2})); - assertFalse(q.contains(new byte[] {3})); + assertTrue(q.contains(new byte[]{1})); + assertTrue(q.contains(new byte[]{2})); + assertFalse(q.contains(new byte[]{3})); } @Test public void drainTo() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -146,7 +155,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void drainTo1() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}, new byte[]{3}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -160,7 +169,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void drainTo1TooSmall() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -176,14 +185,14 @@ private BlockingFileQueue newQueue() throws IOException { byte[] o1 = {1}; byte[] o2 = {2}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(o1)); assertEquals(1, q.size()); assertTrue(q.add(o2)); assertEquals(2, q.size()); - assertFalse(q.remove(new byte[] {3})); + assertFalse(q.remove(new byte[]{3})); assertEquals(2, q.size()); assertTrue(q.remove(o1)); assertEquals(1, q.size()); @@ -196,7 +205,7 @@ public void remove1NotHead() throws IOException { byte[] o1 = {1}; byte[] o2 = {2}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(o1)); assertEquals(1, q.size()); @@ -208,67 +217,68 @@ public void remove1NotHead() throws IOException { @Test(expected = NoSuchElementException.class) public void poll1Empty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.poll(1, TimeUnit.SECONDS); } @Test public void poll1NonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.poll(1, TimeUnit.SECONDS)); } @Test(expected = NoSuchElementException.class) public void elementEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.element(); } @Test public void elementNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); - assertArrayEquals(new byte[] {1}, q.element()); + assertArrayEquals(new byte[]{1}, q.element()); } @Test public void peekEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertNull(q.peek()); } @Test public void peekNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); - assertArrayEquals(new byte[] {1}, q.peek()); + assertArrayEquals(new byte[]{1}, q.peek()); } @Test(expected = NoSuchElementException.class) public void peek1Empty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.peek(1, TimeUnit.SECONDS); } @Test public void peek1NonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); - assertArrayEquals(new byte[] {1}, q.peek(1, TimeUnit.SECONDS)); + assertArrayEquals(new byte[]{1}, q.peek(1, TimeUnit.SECONDS)); } @Test public void size() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); - assertTrue(q.add(new byte[] {1})); - assertEquals(1, q.size());; - assertTrue(q.add(new byte[] {2})); + assertTrue(q.add(new byte[]{1})); + assertEquals(1, q.size()); + + assertTrue(q.add(new byte[]{2})); assertEquals(2, q.size()); } @Test public void isEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.isEmpty()); - assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[]{1})); assertFalse(q.isEmpty()); assertNotNull(q.remove()); assertTrue(q.isEmpty()); @@ -276,10 +286,8 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void iterator() throws IOException { byte[][] ref = {{1}, {2}, {3}}; - BlockingFileQueue q = newQueue(); - for (byte[] b : ref) { - q.add(b); - } + BlockingObjectQueue q = newQueue(); + q.addAll(Arrays.asList(ref)); int i = 0; Iterator it = q.iterator(); @@ -292,76 +300,76 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void toArray() throws IOException { byte[][] bytes = {{1}, {2}}; - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); - assertTrue(q.add(new byte[] {2})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); assertArrayEquals(bytes, q.toArray()); } @Test public void toArray1() throws IOException { byte[][] bytes = {{1}, {2}}; - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); - assertTrue(q.add(new byte[] {2})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); assertArrayEquals(bytes, q.toArray(new byte[0][])); } @Test public void containsAll() throws IOException { List bytes = Arrays.asList(new byte[]{1}, new byte[]{2}); - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertFalse(q.containsAll(bytes)); - assertTrue(q.add(new byte[] {1})); + assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); assertFalse(q.containsAll(bytes)); - assertTrue(q.add(new byte[] {2})); + assertTrue(q.add(new byte[]{2})); assertEquals(2, q.size()); assertTrue(q.containsAll(bytes)); - assertTrue(q.add(new byte[] {3})); + assertTrue(q.add(new byte[]{3})); assertEquals(3, q.size()); assertTrue(q.containsAll(bytes)); } @Test public void addAll() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.addAll(Arrays.asList(new byte[] {1}, new byte[] {2}))); - assertArrayEquals(new byte[] {1}, q.remove()); - assertArrayEquals(new byte[] {2}, q.remove()); + BlockingObjectQueue q = newQueue(); + assertTrue(q.addAll(Arrays.asList(new byte[]{1}, new byte[]{2}))); + assertArrayEquals(new byte[]{1}, q.remove()); + assertArrayEquals(new byte[]{2}, q.remove()); assertTrue(q.isEmpty()); } @Test public void removeAll() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); - assertTrue(q.add(new byte[] {2})); - assertTrue(q.add(new byte[] {3})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + assertTrue(q.add(new byte[]{3})); - List remove = Arrays.asList(new byte[] {1}, new byte[] {2}); + List remove = Arrays.asList(new byte[]{1}, new byte[]{2}); assertTrue(q.removeAll(remove)); - assertArrayEquals(new byte[] {3}, q.remove()); + assertArrayEquals(new byte[]{3}, q.remove()); assertTrue(q.isEmpty()); } @Test public void retainAll() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); - assertTrue(q.add(new byte[] {2})); - assertTrue(q.add(new byte[] {3})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); + assertTrue(q.add(new byte[]{3})); - List retain = Arrays.asList(new byte[] {2}, new byte[] {3}); + List retain = Arrays.asList(new byte[]{2}, new byte[]{3}); assertTrue(q.retainAll(retain)); - assertArrayEquals(new byte[] {2}, q.remove()); - assertArrayEquals(new byte[] {3}, q.remove()); + assertArrayEquals(new byte[]{2}, q.remove()); + assertArrayEquals(new byte[]{3}, q.remove()); assertTrue(q.isEmpty()); } @Test public void clear() throws IOException { - BlockingFileQueue q = newQueue(); - assertTrue(q.add(new byte[] {1})); - assertTrue(q.add(new byte[] {2})); + BlockingObjectQueue q = newQueue(); + assertTrue(q.add(new byte[]{1})); + assertTrue(q.add(new byte[]{2})); assertEquals(2, q.size()); q.clear(); assertEquals(0, q.size()); } -} \ No newline at end of file +} From af0795e87728744225c42a386bf3f9e4772ed60c Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Fri, 26 Oct 2018 01:15:28 +0200 Subject: [PATCH 3/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile QueueFile is now an ObjectQueue and can thus be used without overhead in the blocking queue. ObjectQueue's file():QueueFile method has been moved to FileObjectQueue where it fits better as InMemoryObjectQueue doesn't have a backing file. Removed the ByteArrayConverter again as it was the initial attempt to optimize for byte[] queues. --- .../squareup/tape2/BlockingObjectQueue.java | 2 +- .../squareup/tape2/ByteArrayConverter.java | 20 ------------------- .../com/squareup/tape2/FileObjectQueue.java | 2 +- .../squareup/tape2/InMemoryObjectQueue.java | 4 ---- .../java/com/squareup/tape2/ObjectQueue.java | 3 --- .../java/com/squareup/tape2/QueueFile.java | 17 ++++++++-------- 6 files changed, 10 insertions(+), 38 deletions(-) delete mode 100644 tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index b47ce7a1..11f7ec2c 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -52,7 +52,7 @@ public static BlockingObjectQueue create(QueueFile qf, ObjectQueue.Conver * @return a BlockObjectQueue implementation */ public static BlockingObjectQueue create(QueueFile qf) { - return create(qf, ByteArrayConverter.INSTANCE); + return new BlockingObjectQueue<>(qf); } @Override public boolean add(E element) { diff --git a/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java b/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java deleted file mode 100644 index c568a9af..00000000 --- a/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.squareup.tape2; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * This is a simple "NO-OP" converter for byte[] queues which don't actually need a conversion. - */ -public class ByteArrayConverter implements ObjectQueue.Converter { - - public static final ByteArrayConverter INSTANCE = new ByteArrayConverter(); - - @Override public byte[] from(byte[] source) { - return source; - } - - @Override public void toStream(byte[] value, OutputStream sink) throws IOException { - sink.write(value); - } -} diff --git a/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java b/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java index d87c9718..84df0d6d 100644 --- a/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/FileObjectQueue.java @@ -19,7 +19,7 @@ final class FileObjectQueue extends ObjectQueue { this.converter = converter; } - @Override public @Nonnull QueueFile file() { + public @Nonnull QueueFile file() { return queueFile; } diff --git a/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java b/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java index 5f983d81..d59c2a1b 100644 --- a/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/InMemoryObjectQueue.java @@ -25,10 +25,6 @@ final class InMemoryObjectQueue extends ObjectQueue { entries = new ArrayDeque<>(); } - @Override public @Nullable QueueFile file() { - return null; - } - @Override public void add(T entry) { if (closed) throw new IllegalStateException("closed"); modCount++; diff --git a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java index 560a8dd7..01e6cb00 100644 --- a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java @@ -25,9 +25,6 @@ public static ObjectQueue createInMemory() { return new InMemoryObjectQueue<>(); } - /** The underlying {@link QueueFile} backing this queue, or null if it's only in memory. */ - public abstract @Nullable QueueFile file(); - /** Returns the number of entries in the queue. */ public abstract int size(); diff --git a/tape/src/main/java/com/squareup/tape2/QueueFile.java b/tape/src/main/java/com/squareup/tape2/QueueFile.java index c1cae7b6..1e382dfa 100644 --- a/tape/src/main/java/com/squareup/tape2/QueueFile.java +++ b/tape/src/main/java/com/squareup/tape2/QueueFile.java @@ -15,7 +15,6 @@ */ package com.squareup.tape2; -import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -52,7 +51,7 @@ * * @author Bob Lee (bob@squareup.com) */ -public final class QueueFile implements Closeable, Iterable { +public final class QueueFile extends ObjectQueue { /** Leading bit set to 1 indicating a versioned header and the version of 1. */ private static final int VERSIONED_HEADER = 0x80000001; @@ -358,7 +357,7 @@ private void ringErase(long position, long length) throws IOException { * * @param data to copy bytes from */ - public void add(byte[] data) throws IOException { + @Override public void add(byte[] data) throws IOException { add(data, 0, data.length); } @@ -425,7 +424,7 @@ private long remainingBytes() { } /** Returns true if this queue contains no entries. */ - public boolean isEmpty() { + @Override public boolean isEmpty() { return elementCount == 0; } @@ -488,7 +487,7 @@ private void setLength(long newLength) throws IOException { } /** Reads the eldest element. Returns null if the queue is empty. */ - public @Nullable byte[] peek() throws IOException { + @Override public @Nullable byte[] peek() throws IOException { if (closed) throw new IllegalStateException("closed"); if (isEmpty()) return null; int length = first.length; @@ -582,7 +581,7 @@ private void checkForComodification() { } /** Returns the number of elements in this queue. */ - public int size() { + @Override public int size() { return elementCount; } @@ -591,7 +590,7 @@ public int size() { * * @throws NoSuchElementException if the queue is empty */ - public void remove() throws IOException { + @Override public void remove() throws IOException { remove(1); } @@ -600,7 +599,7 @@ public void remove() throws IOException { * * @throws NoSuchElementException if the queue is empty */ - public void remove(int n) throws IOException { + @Override public void remove(int n) throws IOException { if (n < 0) { throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements."); } @@ -644,7 +643,7 @@ public void remove(int n) throws IOException { } /** Clears this queue. Truncates the file to the initial size. */ - public void clear() throws IOException { + @Override public void clear() throws IOException { if (closed) throw new IllegalStateException("closed"); // Commit the header. From fce3a3a6b58c8b1404c573e36d753b4c0aefb761 Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Fri, 26 Oct 2018 01:24:23 +0200 Subject: [PATCH 4/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile sneakyfy the IO exception as done in other places. --- .../squareup/tape2/BlockingObjectQueue.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index 11f7ec2c..e3383a75 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -62,7 +62,8 @@ public static BlockingObjectQueue create(QueueFile qf) { nonEmpty.signal(); return true; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return false; } finally { lock.unlock(); } @@ -88,12 +89,14 @@ public static BlockingObjectQueue create(QueueFile qf) { } E peek = queue.peek(); if (peek == null) { + // this won't happen unless the backing queue has been shared. throw new IllegalStateException("Queue empty!"); } queue.remove(); return peek; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -113,7 +116,8 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -207,7 +211,8 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -223,7 +228,8 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -237,7 +243,8 @@ public static BlockingObjectQueue create(QueueFile qf) { } return queue.peek(); } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -251,7 +258,8 @@ public static BlockingObjectQueue create(QueueFile qf) { } return queue.peek(); } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -280,7 +288,8 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } return peek; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return null; } finally { lock.unlock(); } @@ -357,7 +366,8 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { nonEmpty.signal(); return true; } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); + return false; } finally { lock.unlock(); } @@ -425,7 +435,7 @@ private static boolean contains(Iterable haystack, Object needle) { try { queue.clear(); } catch (IOException e) { - throw new RuntimeException(e); + QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } From 4f785e142f6603a9feefbad2d588a2ae0aab25aa Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Fri, 26 Oct 2018 02:06:04 +0200 Subject: [PATCH 5/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile Increased coverage and minor cleanup --- .../squareup/tape2/BlockingObjectQueue.java | 11 ++- .../tape2/BlockingObjectQueueTest.java | 93 +++++++++++++++++++ 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index e3383a75..ffd008aa 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -27,10 +27,14 @@ public class BlockingObjectQueue implements BlockingQueue, Closeable { private final ObjectQueue queue; - private BlockingObjectQueue(ObjectQueue queue) { + public BlockingObjectQueue(ObjectQueue queue) { this.queue = queue; } + public ObjectQueue queue() { + return queue; + } + /** * Creates a new {@link BlockingQueue} of type {@code T} backed by the given * {@link ObjectQueue} of type {@code T}. @@ -90,7 +94,7 @@ public static BlockingObjectQueue create(QueueFile qf) { E peek = queue.peek(); if (peek == null) { // this won't happen unless the backing queue has been shared. - throw new IllegalStateException("Queue empty!"); + throw new IllegalStateException("backing queue empty"); } queue.remove(); return peek; @@ -132,9 +136,6 @@ public static BlockingObjectQueue create(QueueFile qf) { * head matches. * */ @Override public boolean remove(Object o) { - if (o == null) { - return false; - } lock.lock(); try { if (queue.isEmpty()) { diff --git a/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java index 32993be8..15b4100b 100644 --- a/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java +++ b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java @@ -6,10 +6,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -126,6 +129,14 @@ private BlockingObjectQueue newQueue() throws IOException { assertEquals(1, q.size()); assertArrayEquals(new byte[]{2}, q.remove()); assertEquals(0, q.size()); + + assertFalse(q.remove(null)); + } + + @Test(expected = NoSuchElementException.class) + public void removeEmpty() throws IOException { + BlockingObjectQueue q = newQueue(); + q.remove(); } @Test public void contains() throws IOException { @@ -166,6 +177,8 @@ private BlockingObjectQueue newQueue() throws IOException { assertEquals(drain, out.size()); assertEquals(1, q.size()); assertArrayEquals(ref.subList(0, drain).toArray(), out.toArray()); + + assertEquals(0, q.drainTo(out, 0)); } @Test public void drainTo1TooSmall() throws IOException { @@ -337,6 +350,7 @@ public void peek1Empty() throws IOException, InterruptedException { assertArrayEquals(new byte[]{1}, q.remove()); assertArrayEquals(new byte[]{2}, q.remove()); assertTrue(q.isEmpty()); + assertFalse(q.addAll(Collections.emptyList())); } @Test public void removeAll() throws IOException { @@ -364,6 +378,20 @@ public void peek1Empty() throws IOException, InterruptedException { assertTrue(q.isEmpty()); } + @Test public void retainAllListEmpty() throws IOException { + List retainNone = Collections.emptyList(); + + BlockingObjectQueue empty = newQueue(); + assertFalse(empty.retainAll(retainNone)); + + BlockingObjectQueue nonEmpty = newQueue(); + assertTrue(nonEmpty.add(new byte[]{1})); + assertTrue(nonEmpty.add(new byte[]{2})); + assertTrue(nonEmpty.add(new byte[]{3})); + assertTrue(nonEmpty.retainAll(retainNone)); + assertTrue(nonEmpty.isEmpty()); + } + @Test public void clear() throws IOException { BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[]{1})); @@ -372,4 +400,69 @@ public void peek1Empty() throws IOException, InterruptedException { q.clear(); assertEquals(0, q.size()); } + + @Test public void create() throws IOException { + QueueFile qf = newQueueFile(); + BlockingObjectQueue q = BlockingObjectQueue.create(qf, new NoOpConverter()); + assertTrue(q.queue() instanceof FileObjectQueue); + } + + @Test public void createByteArray() throws IOException { + QueueFile qf = newQueueFile(); + BlockingObjectQueue q = BlockingObjectQueue.create(qf); + assertTrue(q.queue() instanceof QueueFile); + } + + @Test public void close() throws IOException { + BlockingObjectQueue q = newQueue(); + assertTrue(q.queue() instanceof QueueFile); + QueueFile qf = (QueueFile) q.queue(); + q.close(); + assertTrue(qf.closed); + } + + @Test(expected = IOException.class) + public void ioException() { + BlockingObjectQueue q = new BlockingObjectQueue<>(new BrokenQueueFile()); + q.add(null); + q.remove(); + } + + private static final class BrokenQueueFile extends ObjectQueue { + public int size = 0; + + @Override public int size() { + return size; + } + + @Override public void add(Object entry) throws IOException { + throw new IOException("test"); + } + + @Nullable @Override public Object peek() throws IOException { + throw new IOException("test"); + } + + @Override public void remove(int n) throws IOException { + throw new IOException("test"); + } + + @Override public void close() throws IOException { + throw new IOException("test"); + } + + @Override public Iterator iterator() { + return null; + } + } + + private static final class NoOpConverter implements ObjectQueue.Converter { + @Override public Object from(byte[] source) { + return null; + } + + @Override public void toStream(Object value, OutputStream sink) { + + } + } } From 4ae94a200ebcdd30f2cdb01a0ab7e0b700bb0595 Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Fri, 26 Oct 2018 18:25:33 +0200 Subject: [PATCH 6/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile minor cleanup --- .../com/squareup/tape2/BlockingObjectQueue.java | 4 +++- .../main/java/com/squareup/tape2/ObjectQueue.java | 7 ++++++- .../main/java/com/squareup/tape2/QueueFile.java | 14 -------------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index ffd008aa..01e70de1 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -94,7 +94,7 @@ public static BlockingObjectQueue create(QueueFile qf) { E peek = queue.peek(); if (peek == null) { // this won't happen unless the backing queue has been shared. - throw new IllegalStateException("backing queue empty"); + throw new NoSuchElementException(); } queue.remove(); return peek; @@ -115,6 +115,7 @@ public static BlockingObjectQueue create(QueueFile qf) { } E peek = queue.peek(); if (peek == null) { + // this won't happen unless the backing queue has been shared. throw new NoSuchElementException(); } queue.remove(); @@ -285,6 +286,7 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } E peek = queue.peek(); if (peek == null) { + // this won't happen unless the backing queue has been shared. throw new NoSuchElementException(); } return peek; diff --git a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java index 01e6cb00..eb284864 100644 --- a/tape/src/main/java/com/squareup/tape2/ObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/ObjectQueue.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import javax.annotation.Nullable; /** A queue of objects. */ @@ -62,7 +63,11 @@ public List asList() throws IOException { return peek(size()); } - /** Removes the head of the queue. */ + /** + * Removes the eldest element (the head) of the queue. + * + * @throws NoSuchElementException if the queue is empty + */ public void remove() throws IOException { remove(1); } diff --git a/tape/src/main/java/com/squareup/tape2/QueueFile.java b/tape/src/main/java/com/squareup/tape2/QueueFile.java index 1e382dfa..efb17c9f 100644 --- a/tape/src/main/java/com/squareup/tape2/QueueFile.java +++ b/tape/src/main/java/com/squareup/tape2/QueueFile.java @@ -423,11 +423,6 @@ private long remainingBytes() { return fileLength - usedBytes(); } - /** Returns true if this queue contains no entries. */ - @Override public boolean isEmpty() { - return elementCount == 0; - } - /** * If necessary, expands the file to accommodate an additional element of the given length. * @@ -585,15 +580,6 @@ private void checkForComodification() { return elementCount; } - /** - * Removes the eldest element. - * - * @throws NoSuchElementException if the queue is empty - */ - @Override public void remove() throws IOException { - remove(1); - } - /** * Removes the eldest {@code n} elements. * From 46c3ddd6cca2223b86a0657dcd8bd29738606183 Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Sat, 3 Nov 2018 02:48:23 +0100 Subject: [PATCH 7/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile more cleanup and a new test --- .../squareup/tape2/BlockingObjectQueue.java | 42 +++++++------------ .../tape2/BlockingObjectQueueTest.java | 18 ++++++++ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index 01e70de1..d373521e 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -3,6 +3,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; @@ -14,12 +15,12 @@ /** * This is a simple and unbounded {@link BlockingQueue} implementation as a wrapper - * around a {@link QueueFile}. Thread safety is implemented using a single lock around all - * operations to the backing {@link QueueFile}. + * around a {@link ObjectQueue}. Thread safety is implemented using a single lock around all + * operations to the backing {@link ObjectQueue}. * * @param the element type */ -public class BlockingObjectQueue implements BlockingQueue, Closeable { +public final class BlockingObjectQueue implements BlockingQueue, Closeable { private final Lock lock = new ReentrantLock(); @@ -94,7 +95,7 @@ public static BlockingObjectQueue create(QueueFile qf) { E peek = queue.peek(); if (peek == null) { // this won't happen unless the backing queue has been shared. - throw new NoSuchElementException(); + throw new ConcurrentModificationException(); } queue.remove(); return peek; @@ -115,7 +116,6 @@ public static BlockingObjectQueue create(QueueFile qf) { } E peek = queue.peek(); if (peek == null) { - // this won't happen unless the backing queue has been shared. throw new NoSuchElementException(); } queue.remove(); @@ -133,7 +133,7 @@ public static BlockingObjectQueue create(QueueFile qf) { } /** - * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if * head matches. * */ @Override public boolean remove(Object o) { @@ -172,13 +172,7 @@ public static BlockingObjectQueue create(QueueFile qf) { @Override public int drainTo(Collection c) { lock.lock(); try { - int size = queue.size(); - Iterator it = queue.iterator(); - while (it.hasNext()) { - c.add(it.next()); - it.remove(); - } - return size; + return drainTo(c, size()); } finally { lock.unlock(); } @@ -255,9 +249,6 @@ public static BlockingObjectQueue create(QueueFile qf) { @Override public E peek() { lock.lock(); try { - if (queue.isEmpty()) { - return null; - } return queue.peek(); } catch (IOException e) { QueueFile.getSneakyThrowable(e); @@ -286,7 +277,6 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } E peek = queue.peek(); if (peek == null) { - // this won't happen unless the backing queue has been shared. throw new NoSuchElementException(); } return peek; @@ -317,12 +307,7 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } @Override public Iterator iterator() { - lock.lock(); - try { - return queue.iterator(); - } finally { - lock.unlock(); - } + return queue.iterator(); } @Override public Object[] toArray() { @@ -377,7 +362,7 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } /** - * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if * head matches. * */ @Override public boolean removeAll(Collection c) { @@ -405,7 +390,7 @@ private static boolean contains(Iterable haystack, Object needle) { } /** - * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * The underlying {@link ObjectQueue} only supports removing the head, so this will only work if * head matches. * */ @Override public boolean retainAll(Collection c) { @@ -445,6 +430,11 @@ private static boolean contains(Iterable haystack, Object needle) { } @Override public void close() throws IOException { - queue.close(); + lock.lock(); + try { + queue.close(); + } finally { + lock.unlock(); + } } } diff --git a/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java index 15b4100b..0abaed39 100644 --- a/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java +++ b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java @@ -234,6 +234,24 @@ public void poll1Empty() throws IOException, InterruptedException { q.poll(1, TimeUnit.SECONDS); } + @Test(timeout = 1000) + public void poll1EmptyAndAdd() throws IOException, InterruptedException { + final BlockingObjectQueue q = newQueue(); + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + q.add(new byte[]{1}); + } + }); + t.start(); + assertArrayEquals(new byte[] {1}, q.poll(1, TimeUnit.MINUTES)); + t.join(); + } + @Test public void poll1NonEmpty() throws IOException, InterruptedException { BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); From f6c2a56c83f99d97ad806e5a6d5657f976d83f2b Mon Sep 17 00:00:00 2001 From: Phillip Schichtel Date: Sat, 3 Nov 2018 22:08:32 +0100 Subject: [PATCH 8/8] Closes #198: Implement the BlockingQueue as a wrapper around the QueueFile directly throw the sneaky exceptions --- .../squareup/tape2/BlockingObjectQueue.java | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index d373521e..26245d05 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -67,8 +67,7 @@ public static BlockingObjectQueue create(QueueFile qf) { nonEmpty.signal(); return true; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return false; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -100,8 +99,7 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -121,8 +119,7 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -207,8 +204,7 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -224,8 +220,7 @@ public static BlockingObjectQueue create(QueueFile qf) { queue.remove(); return peek; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -239,8 +234,7 @@ public static BlockingObjectQueue create(QueueFile qf) { } return queue.peek(); } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -251,8 +245,7 @@ public static BlockingObjectQueue create(QueueFile qf) { try { return queue.peek(); } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -281,8 +274,7 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { } return peek; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return null; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -354,8 +346,7 @@ public E peek(long timeout, TimeUnit unit) throws InterruptedException { nonEmpty.signal(); return true; } catch (IOException e) { - QueueFile.getSneakyThrowable(e); - return false; + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); } @@ -423,7 +414,7 @@ private static boolean contains(Iterable haystack, Object needle) { try { queue.clear(); } catch (IOException e) { - QueueFile.getSneakyThrowable(e); + throw QueueFile.getSneakyThrowable(e); } finally { lock.unlock(); }