Skip to content
This repository has been archived by the owner on Dec 23, 2023. It is now read-only.

Prevent blocking on queue overflow (#1809) #1837

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildscripts/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ General guidelines on imports:
<allow pkg="io.opencensus.trace"/>
</subpackage>
<subpackage name="impl">
<allow pkg="com.google.common"/>
<allow pkg="com.lmax.disruptor"/>
<allow pkg="io.opencensus.common"/>
<allow pkg="io.opencensus.impl"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package io.opencensus.impl.internal;

import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opencensus.implcore.internal.DaemonThreadFactory;
import io.opencensus.implcore.internal.EventQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -96,7 +100,7 @@ public final class DisruptorEventQueue implements EventQueue {
// TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
// configured to a size appropriate to the system (smaller/less busy systems will not need as
// large a queue.
private static final int DISRUPTOR_BUFFER_SIZE = 8192;
@VisibleForTesting static final int DISRUPTOR_BUFFER_SIZE = 8192;
// The single instance of the class.
private static final DisruptorEventQueue eventQueue = create();

Expand All @@ -105,6 +109,8 @@ public final class DisruptorEventQueue implements EventQueue {

private volatile DisruptorEnqueuer enqueuer;

@VisibleForTesting static final AtomicLong overflowCount = new AtomicLong();

// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
private DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer) {
this.disruptor = disruptor;
Expand All @@ -130,14 +136,27 @@ private static DisruptorEventQueue create() {
DisruptorEnqueuer enqueuer =
new DisruptorEnqueuer() {
@Override
public void enqueue(Entry entry) {
long sequence = ringBuffer.next();
public boolean enqueue(Entry entry) {
long sequence;
try {
sequence = ringBuffer.tryNext();
} catch (InsufficientCapacityException e) {
// Show warning log only once to prevent blocking caused by logger.
if (overflowCount.getAndIncrement() == 0) {
logger.log(
Level.WARNING,
"Dropping some events due to queue overflow."
+ " Consider to reduce sampling rate.");
}
return false;
}
try {
DisruptorEvent event = ringBuffer.get(sequence);
event.setEntry(entry);
} finally {
ringBuffer.publish(sequence);
}
return true;
}
};
return new DisruptorEventQueue(disruptor, enqueuer);
Expand All @@ -159,7 +178,9 @@ public static DisruptorEventQueue getInstance() {
*/
@Override
public void enqueue(Entry entry) {
enqueuer.enqueue(entry);
if (!enqueuer.enqueue(entry)) {
entry.rejected();
}
}

/** Shuts down the underlying disruptor. */
Expand All @@ -170,10 +191,11 @@ public void shutdown() {
final AtomicBoolean logged = new AtomicBoolean(false);

@Override
public void enqueue(Entry entry) {
public boolean enqueue(Entry entry) {
if (!logged.getAndSet(true)) {
logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
}
return false;
}
};

Expand All @@ -183,7 +205,14 @@ public void enqueue(Entry entry) {
// Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
private abstract static class DisruptorEnqueuer {

public abstract void enqueue(Entry entry);
/**
* Try to enqueue Entry into the queue.
*
* @param entry Entry to enqueue
* @return If enqueue failed, returns false.
*/
@CheckReturnValue
public abstract boolean enqueue(Entry entry);
}

// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,67 @@
import static com.google.common.truth.Truth.assertThat;

import io.opencensus.implcore.internal.EventQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link DisruptorEventQueue}. */
@RunWith(JUnit4.class)
public class DisruptorEventQueueTest {

// Simple class to use that keeps an incrementing counter. Will fail with an assertion if
// increment is used from multiple threads, or if the stored value is different from that expected
// by the caller.
private static class Counter {
private int count;

@Nullable private final CountDownLatch latch;
private final AtomicInteger count = new AtomicInteger();
private volatile long id; // stores thread ID used in first increment operation.

public Counter() {
count = 0;
this(null);
}

public Counter(@Nullable CountDownLatch latch) {
this.latch = latch;
id = -1;
}

// Increments counter by 1. Will fail in assertion if multiple different threads are used
// (the EventQueue backend should be single-threaded).
public void increment() {
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

long tid = Thread.currentThread().getId();
if (id == -1) {
assertThat(count).isEqualTo(0);
assertThat(count.get()).isEqualTo(0);
id = tid;
} else {
assertThat(id).isEqualTo(tid);
}
count++;
count.incrementAndGet();
}

// Check the current value of the counter. Assert if it is not the expected value.
public void check(int value) {
assertThat(count).isEqualTo(value);
assertThat(count.get()).isEqualTo(value);
}
}

// EventQueueEntry for incrementing a Counter.
private static class IncrementEvent implements EventQueue.Entry {

private final Counter counter;
private volatile boolean isRejected = false;

IncrementEvent(Counter counter) {
this.counter = counter;
Expand All @@ -69,6 +89,11 @@ private static class IncrementEvent implements EventQueue.Entry {
public void process() {
counter.increment();
}

@Override
public void rejected() {
isRejected = true;
}
}

@Test
Expand All @@ -88,10 +113,20 @@ public void incrementOnce() {
@Test
public void incrementTenK() {
final int tenK = 10000;
final int sleepEach = 1000;
Counter counter = new Counter();
for (int i = 0; i < tenK; i++) {
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);

// Sleep to prevent queue overflow
if (i % sleepEach == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
// Sleep briefly, to allow background operations to complete.
try {
Expand All @@ -101,4 +136,33 @@ public void incrementTenK() {
}
counter.check(tenK);
}

@Test(timeout = 30 * 1000)
public void shouldNotBlockWhenOverflow() {
long overflowCountBefore = DisruptorEventQueue.overflowCount.get();

// Queue blocking events to fill queue
CountDownLatch latch = new CountDownLatch(1);
Counter counter = new Counter(latch);
for (int i = 0; i < DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE; i++) {
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
}
counter.check(0);

// Queue event into filled queue to test overflow behavior
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
assertThat(ie.isRejected).isTrue();
assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1);

// Cleanup events
latch.countDown();
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
counter.check(DisruptorEventQueue.DISRUPTOR_BUFFER_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ interface Entry {
* associated {@link EventQueue}.
*/
void process();

/**
* Cleanup resources associated with this entry to prevent leak. This method is called when this
* event is rejected. Note that this method might be called in foreground (application) thread.
*/
void rejected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,8 @@ public void process() {
// Add Timestamp to value after it went through the DisruptorQueue.
statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now());
}

@Override
public void rejected() {}
}
}
Loading