From 4bc39b656b8ffc0deca89c8a220475e9379b7eda Mon Sep 17 00:00:00 2001 From: Seiya Yazaki Date: Tue, 9 Apr 2019 11:46:13 +0900 Subject: [PATCH] Prevent blocking on queue overflow (#1809) --- buildscripts/import-control.xml | 1 + .../impl/internal/DisruptorEventQueue.java | 21 +++++- .../internal/DisruptorEventQueueTest.java | 67 +++++++++++++++++-- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/buildscripts/import-control.xml b/buildscripts/import-control.xml index 1aafb5bf06..aaf560879a 100644 --- a/buildscripts/import-control.xml +++ b/buildscripts/import-control.xml @@ -269,6 +269,7 @@ General guidelines on imports: + diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java index 59d41b5b0c..9abea915b6 100644 --- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java +++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java @@ -16,8 +16,10 @@ package io.opencensus.impl.internal; +import com.google.common.annotations.VisibleForTesting; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; @@ -25,6 +27,7 @@ import io.opencensus.implcore.internal.DaemonThreadFactory; import io.opencensus.implcore.internal.EventQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -96,7 +99,7 @@ public final class DisruptorEventQueue implements EventQueue { // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be // configured to a size appropriate to the system (smaller/less busy systems will not need as // large a queue. - private static final int DISRUPTOR_BUFFER_SIZE = 8192; + @VisibleForTesting static final int DISRUPTOR_BUFFER_SIZE = 8192; // The single instance of the class. private static final DisruptorEventQueue eventQueue = create(); @@ -105,6 +108,8 @@ public final class DisruptorEventQueue implements EventQueue { private volatile DisruptorEnqueuer enqueuer; + @VisibleForTesting static final AtomicLong overflowCount = new AtomicLong(); + // Creates a new EventQueue. Private to prevent creation of non-singleton instance. private DisruptorEventQueue(Disruptor disruptor, DisruptorEnqueuer enqueuer) { this.disruptor = disruptor; @@ -131,7 +136,19 @@ private static DisruptorEventQueue create() { new DisruptorEnqueuer() { @Override public void enqueue(Entry entry) { - long sequence = ringBuffer.next(); + long sequence; + try { + sequence = ringBuffer.tryNext(); + } catch (InsufficientCapacityException e) { + // Show warning log only once to prevent blocking caused by logger. + if (overflowCount.getAndIncrement() == 0) { + logger.log( + Level.WARNING, + "Dropping some events due to queue overflow." + + " Consider to reduce sampling rate."); + } + return; + } try { DisruptorEvent event = ringBuffer.get(sequence); event.setEntry(entry); diff --git a/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java b/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java index f12498fa46..2107717cd3 100644 --- a/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java +++ b/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java @@ -19,6 +19,9 @@ import static com.google.common.truth.Truth.assertThat; import io.opencensus.implcore.internal.EventQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -26,39 +29,55 @@ /** Unit tests for {@link DisruptorEventQueue}. */ @RunWith(JUnit4.class) public class DisruptorEventQueueTest { + // Simple class to use that keeps an incrementing counter. Will fail with an assertion if // increment is used from multiple threads, or if the stored value is different from that expected // by the caller. private static class Counter { - private int count; + + @Nullable private final CountDownLatch latch; + private final AtomicInteger count = new AtomicInteger(); private volatile long id; // stores thread ID used in first increment operation. public Counter() { - count = 0; + this(null); + } + + public Counter(@Nullable CountDownLatch latch) { + this.latch = latch; id = -1; } // Increments counter by 1. Will fail in assertion if multiple different threads are used // (the EventQueue backend should be single-threaded). public void increment() { + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + long tid = Thread.currentThread().getId(); if (id == -1) { - assertThat(count).isEqualTo(0); + assertThat(count.get()).isEqualTo(0); id = tid; } else { assertThat(id).isEqualTo(tid); } - count++; + count.incrementAndGet(); } // Check the current value of the counter. Assert if it is not the expected value. public void check(int value) { - assertThat(count).isEqualTo(value); + assertThat(count.get()).isEqualTo(value); } } // EventQueueEntry for incrementing a Counter. private static class IncrementEvent implements EventQueue.Entry { + private final Counter counter; IncrementEvent(Counter counter) { @@ -88,10 +107,20 @@ public void incrementOnce() { @Test public void incrementTenK() { final int tenK = 10000; + final int sleepEach = 1000; Counter counter = new Counter(); for (int i = 0; i < tenK; i++) { IncrementEvent ie = new IncrementEvent(counter); DisruptorEventQueue.getInstance().enqueue(ie); + + // Sleep to prevent queue overflow + if (i % sleepEach == 0) { + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } } // Sleep briefly, to allow background operations to complete. try { @@ -101,4 +130,32 @@ public void incrementTenK() { } counter.check(tenK); } + + @Test(timeout = 30 * 1000) + public void shouldNotBlockWhenOverflow() { + long overflowCountBefore = DisruptorEventQueue.overflowCount.get(); + + // Queue blocking events to fill queue + CountDownLatch latch = new CountDownLatch(1); + Counter counter = new Counter(latch); + for (int i = 0; i < DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE; i++) { + IncrementEvent ie = new IncrementEvent(counter); + DisruptorEventQueue.getInstance().enqueue(ie); + } + counter.check(0); + + // Queue event into filled queue to test overflow behavior + IncrementEvent ie = new IncrementEvent(counter); + DisruptorEventQueue.getInstance().enqueue(ie); + assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1); + + // Cleanup events + latch.countDown(); + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + counter.check(DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE); + } }