Skip to content

Commit

Permalink
Prevent blocking on queue overflow (census-instrumentation#1809)
Browse files Browse the repository at this point in the history
  • Loading branch information
saiya committed May 16, 2019
1 parent 8312f0e commit f4c94aa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
1 change: 1 addition & 0 deletions buildscripts/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ General guidelines on imports:
<allow pkg="io.opencensus.trace"/>
</subpackage>
<subpackage name="impl">
<allow pkg="com.google.common"/>
<allow pkg="com.lmax.disruptor"/>
<allow pkg="io.opencensus.common"/>
<allow pkg="io.opencensus.impl"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package io.opencensus.impl.internal;

import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opencensus.implcore.internal.DaemonThreadFactory;
import io.opencensus.implcore.internal.EventQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -96,7 +99,7 @@ public final class DisruptorEventQueue implements EventQueue {
// TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
// configured to a size appropriate to the system (smaller/less busy systems will not need as
// large a queue.
private static final int DISRUPTOR_BUFFER_SIZE = 8192;
@VisibleForTesting static final int DISRUPTOR_BUFFER_SIZE = 8192;
// The single instance of the class.
private static final DisruptorEventQueue eventQueue = create();

Expand All @@ -105,6 +108,8 @@ public final class DisruptorEventQueue implements EventQueue {

private volatile DisruptorEnqueuer enqueuer;

@VisibleForTesting static final AtomicLong overflowCount = new AtomicLong();

// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
private DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer) {
this.disruptor = disruptor;
Expand All @@ -131,7 +136,19 @@ private static DisruptorEventQueue create() {
new DisruptorEnqueuer() {
@Override
public void enqueue(Entry entry) {
long sequence = ringBuffer.next();
long sequence;
try {
sequence = ringBuffer.tryNext();
} catch (InsufficientCapacityException e) {
// Show warning log only once to prevent blocking caused by logger.
if (overflowCount.getAndIncrement() == 0) {
logger.log(
Level.WARNING,
"Dropping some events due to queue overflow."
+ " Consider to reduce sampling rate.");
}
return;
}
try {
DisruptorEvent event = ringBuffer.get(sequence);
event.setEntry(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,65 @@
import static com.google.common.truth.Truth.assertThat;

import io.opencensus.implcore.internal.EventQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link DisruptorEventQueue}. */
@RunWith(JUnit4.class)
public class DisruptorEventQueueTest {

// Simple class to use that keeps an incrementing counter. Will fail with an assertion if
// increment is used from multiple threads, or if the stored value is different from that expected
// by the caller.
private static class Counter {
private int count;

@Nullable private final CountDownLatch latch;
private final AtomicInteger count = new AtomicInteger();
private volatile long id; // stores thread ID used in first increment operation.

public Counter() {
count = 0;
this(null);
}

public Counter(@Nullable CountDownLatch latch) {
this.latch = latch;
id = -1;
}

// Increments counter by 1. Will fail in assertion if multiple different threads are used
// (the EventQueue backend should be single-threaded).
public void increment() {
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

long tid = Thread.currentThread().getId();
if (id == -1) {
assertThat(count).isEqualTo(0);
assertThat(count.get()).isEqualTo(0);
id = tid;
} else {
assertThat(id).isEqualTo(tid);
}
count++;
count.incrementAndGet();
}

// Check the current value of the counter. Assert if it is not the expected value.
public void check(int value) {
assertThat(count).isEqualTo(value);
assertThat(count.get()).isEqualTo(value);
}
}

// EventQueueEntry for incrementing a Counter.
private static class IncrementEvent implements EventQueue.Entry {

private final Counter counter;

IncrementEvent(Counter counter) {
Expand Down Expand Up @@ -88,10 +107,20 @@ public void incrementOnce() {
@Test
public void incrementTenK() {
final int tenK = 10000;
final int sleepEach = 1000;
Counter counter = new Counter();
for (int i = 0; i < tenK; i++) {
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);

// Sleep to prevent queue overflow
if (i % sleepEach == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
// Sleep briefly, to allow background operations to complete.
try {
Expand All @@ -101,4 +130,32 @@ public void incrementTenK() {
}
counter.check(tenK);
}

@Test(timeout = 30 * 1000)
public void shouldNotBlockWhenOverflow() {
long overflowCountBefore = DisruptorEventQueue.overflowCount.get();

// Queue blocking events to fill queue
CountDownLatch latch = new CountDownLatch(1);
Counter counter = new Counter(latch);
for (int i = 0; i < DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE; i++) {
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
}
counter.check(0);

// Queue event into filled queue to test overflow behavior
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1);

// Cleanup events
latch.countDown();
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
counter.check(DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE);
}
}

0 comments on commit f4c94aa

Please sign in to comment.