diff --git a/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java similarity index 69% rename from tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java rename to tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java index 745fe1bf..7fceeaf5 100644 --- a/tape/src/main/java/com/squareup/tape2/BlockingFileQueue.java +++ b/tape/src/main/java/com/squareup/tape2/BlockingObjectQueue.java @@ -1,10 +1,10 @@ package com.squareup.tape2; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -12,27 +12,37 @@ import java.util.concurrent.locks.ReentrantLock; /** - * This is a simple and unbounded @{@link BlockingQueue} implementation as a wrapper around a @{@link QueueFile}. - * Thread safety is implemented using a single lock around all operations to the backing @{@link QueueFile}. + * This is a simple and unbounded {@link BlockingQueue} implementation as a wrapper + * around a {@link QueueFile}. Thread safety is implemented using a single lock around all + * operations to the backing {@link QueueFile}. */ -public class BlockingFileQueue implements BlockingQueue { +public class BlockingObjectQueue implements BlockingQueue { private final Lock lock = new ReentrantLock(); private final Condition nonEmpty = lock.newCondition(); - private final QueueFile queue; + private final ObjectQueue queue; /** - * Creates a new @{@link BlockingQueue} of type {@code byte[]} backed by the given @{@link QueueFile}. + * Creates a new {@link BlockingQueue} of type {@code E} backed by the given + * {@link ObjectQueue} of type {@code E}. * * @param queue the queue file which should not be shared to other places * */ - public BlockingFileQueue(QueueFile queue) { + public BlockingObjectQueue(ObjectQueue queue) { this.queue = queue; } - @Override public boolean add(byte[] bytes) { + public static BlockingObjectQueue create(QueueFile qf, ObjectQueue.Converter conv) { + return new BlockingObjectQueue<>(new FileObjectQueue<>(qf, conv)); + } + + public static BlockingObjectQueue create(QueueFile qf) { + return create(qf, ByteArrayConverter.INSTANCE); + } + + @Override public boolean add(E bytes) { lock.lock(); try { queue.add(bytes); @@ -45,25 +55,25 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public boolean offer(byte[] bytes) { + @Override public boolean offer(E bytes) { return add(bytes); } - @Override public void put(byte[] bytes) { + @Override public void put(E bytes) { add(bytes); } - @Override public boolean offer(byte[] bytes, long timeout, TimeUnit unit) { + @Override public boolean offer(E bytes, long timeout, TimeUnit unit) { return add(bytes); } - @Override public byte[] take() throws InterruptedException { + @Override public E take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { nonEmpty.await(); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new IllegalStateException("Queue empty!"); } @@ -76,14 +86,14 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] poll(long timeout, TimeUnit unit) throws InterruptedException { + @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { long timeoutNanos = unit.toNanos(timeout); while (queue.isEmpty() && timeoutNanos > 0) { timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new NoSuchElementException(); } @@ -100,23 +110,22 @@ public BlockingFileQueue(QueueFile queue) { return Integer.MAX_VALUE; // as per BlockingQueue javadoc for unbounded queues } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean remove(Object o) { if (o == null) { return false; } - if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { - return false; - } - byte[] remove = (byte[])o; lock.lock(); try { if (queue.isEmpty()) { return false; } - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { - if (Arrays.equals(it.next(), remove)) { + if (Objects.deepEquals(it.next(), o)) { it.remove(); return true; } @@ -128,17 +137,10 @@ public BlockingFileQueue(QueueFile queue) { } @Override public boolean contains(Object o) { - if (o == null) { - return false; - } - if (o.getClass() != byte[].class && o.getClass() != Byte[].class) { - return false; - } - byte[] check = (byte[])o; lock.lock(); try { - for (byte[] entry : queue) { - if (Arrays.equals(entry, check)) { + for (E entry : queue) { + if (Objects.deepEquals(entry, o)) { return true; } } @@ -148,11 +150,11 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public int drainTo(Collection c) { + @Override public int drainTo(Collection c) { lock.lock(); try { int size = queue.size(); - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { c.add(it.next()); it.remove(); @@ -163,13 +165,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public int drainTo(Collection c, int maxElements) { + @Override public int drainTo(Collection c, int maxElements) { if (maxElements == 0) { return 0; } lock.lock(); try { - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); int i = 0; while (it.hasNext() && i < maxElements) { c.add(it.next()); @@ -182,13 +184,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] remove() { + @Override public E remove() { lock.lock(); try { if (queue.isEmpty()) { throw new NoSuchElementException(); } - byte[] peek = queue.peek(); + E peek = queue.peek(); queue.remove(); return peek; } catch (IOException e) { @@ -198,13 +200,13 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] poll() { + @Override public E poll() { lock.lock(); try { if (queue.isEmpty()) { return null; } - byte[] peek = queue.peek(); + E peek = queue.peek(); queue.remove(); return peek; } catch (IOException e) { @@ -214,7 +216,7 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] element() { + @Override public E element() { lock.lock(); try { if (queue.isEmpty()) { @@ -228,7 +230,7 @@ public BlockingFileQueue(QueueFile queue) { } } - @Override public byte[] peek() { + @Override public E peek() { lock.lock(); try { if (queue.isEmpty()) { @@ -243,8 +245,8 @@ public BlockingFileQueue(QueueFile queue) { } /** - * This overload is an addition to the @{@link BlockingQueue} interface similar to the {@link #poll(long, TimeUnit)} - * method, a blocking peek operation with a timeout. + * This overload is an addition to the {@link BlockingQueue} interface similar to the + * {@link #poll(long, TimeUnit)} method, a blocking peek operation with a timeout. * * @param timeout the timeout * @param unit the time unit of the timeout @@ -252,14 +254,14 @@ public BlockingFileQueue(QueueFile queue) { * @see #peek() for more information * @throws InterruptedException if interrupted while waiting */ - public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { + public E peek(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { long timeoutNanos = unit.toNanos(timeout); while (queue.isEmpty() && timeoutNanos > 0) { timeoutNanos = nonEmpty.awaitNanos(timeoutNanos); } - byte[] peek = queue.peek(); + E peek = queue.peek(); if (peek == null) { throw new NoSuchElementException(); } @@ -289,7 +291,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - @Override public Iterator iterator() { + @Override public Iterator iterator() { lock.lock(); try { return queue.iterator(); @@ -303,7 +305,7 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { try { Object[] out = new Object[queue.size()]; int i = 0; - for (byte[] e : queue) { + for (E e : queue) { out[i++] = e; } return out; @@ -330,13 +332,13 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - @Override public boolean addAll(Collection c) { + @Override public boolean addAll(Collection c) { if (c.isEmpty()) { return false; } lock.lock(); try { - for (byte[] e : c) { + for (E e : c) { queue.add(e); } nonEmpty.signal(); @@ -348,7 +350,10 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean removeAll(Collection c) { lock.lock(); boolean changed = false; @@ -364,17 +369,19 @@ public byte[] peek(long timeout, TimeUnit unit) throws InterruptedException { } } - private static boolean arrayContained(Collection haystack, byte[] needle) { + private static boolean contains(Iterable haystack, Object needle) { for (Object o : haystack) { - byte[] byteArray = (byte[])o; - if (Arrays.equals(byteArray, needle)) { + if (Objects.deepEquals(o, needle)) { return true; } } return false; } - /** The backing @{@link QueueFile} only supports removing the head, so this will only work if head matches. */ + /** + * The underlying {@link QueueFile} only supports removing the head, so this will only work if + * head matches. + * */ @Override public boolean retainAll(Collection c) { lock.lock(); if (c.isEmpty()) { @@ -387,9 +394,9 @@ private static boolean arrayContained(Collection haystack, byte[] needle) { } try { boolean changed = false; - Iterator it = queue.iterator(); + Iterator it = queue.iterator(); while (it.hasNext()) { - if (!arrayContained(c, it.next())) { + if (!contains(c, it.next())) { it.remove(); changed = true; } diff --git a/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java b/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java new file mode 100644 index 00000000..c568a9af --- /dev/null +++ b/tape/src/main/java/com/squareup/tape2/ByteArrayConverter.java @@ -0,0 +1,20 @@ +package com.squareup.tape2; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * This is a simple "NO-OP" converter for byte[] queues which don't actually need a conversion. + */ +public class ByteArrayConverter implements ObjectQueue.Converter { + + public static final ByteArrayConverter INSTANCE = new ByteArrayConverter(); + + @Override public byte[] from(byte[] source) { + return source; + } + + @Override public void toStream(byte[] value, OutputStream sink) throws IOException { + sink.write(value); + } +} diff --git a/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java similarity index 84% rename from tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java rename to tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java index 2c7f04d8..af28a5e0 100644 --- a/tape/src/test/java/com/squareup/tape2/BlockingFileQueueTest.java +++ b/tape/src/test/java/com/squareup/tape2/BlockingObjectQueueTest.java @@ -13,7 +13,7 @@ import static org.junit.Assert.*; -public class BlockingFileQueueTest { +public class BlockingObjectQueueTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); private File file; @@ -27,12 +27,12 @@ private QueueFile newQueueFile() throws IOException { return new QueueFile.Builder(file).build(); } - private BlockingFileQueue newQueue() throws IOException { - return new BlockingFileQueue(newQueueFile()); + private BlockingObjectQueue newQueue() throws IOException { + return BlockingObjectQueue.create(newQueueFile()); } @Test public void add() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.add(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -40,7 +40,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void offer() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.offer(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -48,7 +48,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void put() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -56,7 +56,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void offer1() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); q.offer(new byte[]{1, 2}, 1, TimeUnit.SECONDS); assertArrayEquals(new byte[]{1, 2}, q.peek()); @@ -64,7 +64,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void takeEmpty() throws IOException, InterruptedException { - final BlockingFileQueue q = newQueue(); + final BlockingObjectQueue q = newQueue(); Thread t = new Thread(new Runnable() { @Override public void run() { try { @@ -81,24 +81,24 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void takeNonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.take()); } @Test public void pollEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertNull(q.poll()); } @Test public void pollNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.poll()); } @Test public void remainingCapacity() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); q.add(new byte[]{}); @@ -107,7 +107,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void remove() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); @@ -120,7 +120,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void contains() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(new byte[]{1})); assertEquals(1, q.size()); @@ -133,7 +133,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void drainTo() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -146,7 +146,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void drainTo1() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}, new byte[]{3}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -160,7 +160,7 @@ private BlockingFileQueue newQueue() throws IOException { } @Test public void drainTo1TooSmall() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); List ref = Arrays.asList(new byte[]{1}, new byte[]{2}); assertTrue(q.addAll(ref)); assertEquals(ref.size(), q.size()); @@ -176,7 +176,7 @@ private BlockingFileQueue newQueue() throws IOException { byte[] o1 = {1}; byte[] o2 = {2}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(o1)); assertEquals(1, q.size()); @@ -196,7 +196,7 @@ public void remove1NotHead() throws IOException { byte[] o1 = {1}; byte[] o2 = {2}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(o1)); assertEquals(1, q.size()); @@ -208,36 +208,36 @@ public void remove1NotHead() throws IOException { @Test(expected = NoSuchElementException.class) public void poll1Empty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.poll(1, TimeUnit.SECONDS); } @Test public void poll1NonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.put(new byte[]{1, 2}); assertArrayEquals(new byte[]{1, 2}, q.poll(1, TimeUnit.SECONDS)); } @Test(expected = NoSuchElementException.class) public void elementEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.element(); } @Test public void elementNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertEquals(1, q.size()); assertArrayEquals(new byte[] {1}, q.element()); } @Test public void peekEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertNull(q.peek()); } @Test public void peekNonEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertEquals(1, q.size()); assertArrayEquals(new byte[] {1}, q.peek()); @@ -245,19 +245,19 @@ public void elementEmpty() throws IOException { @Test(expected = NoSuchElementException.class) public void peek1Empty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); q.peek(1, TimeUnit.SECONDS); } @Test public void peek1NonEmpty() throws IOException, InterruptedException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertEquals(1, q.size()); assertArrayEquals(new byte[] {1}, q.peek(1, TimeUnit.SECONDS)); } @Test public void size() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertEquals(0, q.size()); assertTrue(q.add(new byte[] {1})); assertEquals(1, q.size());; @@ -266,7 +266,7 @@ public void peek1Empty() throws IOException, InterruptedException { } @Test public void isEmpty() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.isEmpty()); assertTrue(q.add(new byte[] {1})); assertFalse(q.isEmpty()); @@ -276,7 +276,7 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void iterator() throws IOException { byte[][] ref = {{1}, {2}, {3}}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); for (byte[] b : ref) { q.add(b); } @@ -292,7 +292,7 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void toArray() throws IOException { byte[][] bytes = {{1}, {2}}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertTrue(q.add(new byte[] {2})); @@ -301,7 +301,7 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void toArray1() throws IOException { byte[][] bytes = {{1}, {2}}; - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertTrue(q.add(new byte[] {2})); @@ -310,7 +310,7 @@ public void peek1Empty() throws IOException, InterruptedException { @Test public void containsAll() throws IOException { List bytes = Arrays.asList(new byte[]{1}, new byte[]{2}); - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertFalse(q.containsAll(bytes)); assertTrue(q.add(new byte[] {1})); assertEquals(1, q.size()); @@ -324,7 +324,7 @@ public void peek1Empty() throws IOException, InterruptedException { } @Test public void addAll() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.addAll(Arrays.asList(new byte[] {1}, new byte[] {2}))); assertArrayEquals(new byte[] {1}, q.remove()); assertArrayEquals(new byte[] {2}, q.remove()); @@ -332,7 +332,7 @@ public void peek1Empty() throws IOException, InterruptedException { } @Test public void removeAll() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertTrue(q.add(new byte[] {2})); assertTrue(q.add(new byte[] {3})); @@ -344,7 +344,7 @@ public void peek1Empty() throws IOException, InterruptedException { } @Test public void retainAll() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertTrue(q.add(new byte[] {2})); assertTrue(q.add(new byte[] {3})); @@ -357,7 +357,7 @@ public void peek1Empty() throws IOException, InterruptedException { } @Test public void clear() throws IOException { - BlockingFileQueue q = newQueue(); + BlockingObjectQueue q = newQueue(); assertTrue(q.add(new byte[] {1})); assertTrue(q.add(new byte[] {2})); assertEquals(2, q.size());