Skip to content
This repository has been archived by the owner on Dec 23, 2023. It is now read-only.

Commit

Permalink
Add EventQueue.Entry#rejected() for cleanup logic to prevent leak (#1809
Browse files Browse the repository at this point in the history
)
  • Loading branch information
saiya committed Apr 11, 2019
1 parent 8b886b0 commit 6ff8ccd
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -135,7 +136,7 @@ private static DisruptorEventQueue create() {
DisruptorEnqueuer enqueuer =
new DisruptorEnqueuer() {
@Override
public void enqueue(Entry entry) {
public boolean enqueue(Entry entry) {
long sequence;
try {
sequence = ringBuffer.tryNext();
Expand All @@ -147,14 +148,15 @@ public void enqueue(Entry entry) {
"Dropping some events due to queue overflow."
+ " Consider to reduce sampling rate.");
}
return;
return false;
}
try {
DisruptorEvent event = ringBuffer.get(sequence);
event.setEntry(entry);
} finally {
ringBuffer.publish(sequence);
}
return true;
}
};
return new DisruptorEventQueue(disruptor, enqueuer);
Expand All @@ -176,7 +178,9 @@ public static DisruptorEventQueue getInstance() {
*/
@Override
public void enqueue(Entry entry) {
enqueuer.enqueue(entry);
if (!enqueuer.enqueue(entry)) {
entry.rejected();
}
}

/** Shuts down the underlying disruptor. */
Expand All @@ -187,10 +191,11 @@ public void shutdown() {
final AtomicBoolean logged = new AtomicBoolean(false);

@Override
public void enqueue(Entry entry) {
public boolean enqueue(Entry entry) {
if (!logged.getAndSet(true)) {
logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
}
return false;
}
};

Expand All @@ -200,7 +205,14 @@ public void enqueue(Entry entry) {
// Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
private abstract static class DisruptorEnqueuer {

public abstract void enqueue(Entry entry);
/**
* Try to enqueue Entry into the queue.
*
* @param entry Entry to enqueue
* @return If enqueue failed, returns false.
*/
@CheckReturnValue
public abstract boolean enqueue(Entry entry);
}

// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void check(int value) {
private static class IncrementEvent implements EventQueue.Entry {

private final Counter counter;
private volatile boolean isRejected = false;

IncrementEvent(Counter counter) {
this.counter = counter;
Expand All @@ -88,6 +89,11 @@ private static class IncrementEvent implements EventQueue.Entry {
public void process() {
counter.increment();
}

@Override
public void rejected() {
isRejected = true;
}
}

@Test
Expand Down Expand Up @@ -147,6 +153,7 @@ public void shouldNotBlockWhenOverflow() {
// Queue event into filled queue to test overflow behavior
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
assertThat(ie.isRejected).isTrue();
assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1);

// Cleanup events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,10 @@ interface Entry {
* associated {@link EventQueue}.
*/
void process();

/**
*
*/
void rejected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,8 @@ public void process() {
// Add Timestamp to value after it went through the DisruptorQueue.
statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now());
}

@Override
public void rejected() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public void process() {
activeSpansExporter.onStart(span);
}
}

@Override
public void rejected() {}
}

// An EventQueue entry that records the end of the span event.
Expand All @@ -116,15 +119,25 @@ public void process() {
if (span.getContext().getTraceOptions().isSampled()) {
spanExporter.addSpan(span);
}
removeFromRunningSpanStore();
if (sampledSpanStore != null) {
sampledSpanStore.considerForSampling(span);
}
}

@Override
public void rejected() {
// Should remove from runningSpanStore to prevent memory leak
removeFromRunningSpanStore();
}

private void removeFromRunningSpanStore() {
if (runningSpanStore != null) {
try {
// Note that corresponding SpanStartEvent / onStart(span) might not have called if queue was full.
runningSpanStore.onEnd(span);
} catch (IllegalArgumentException e) {} // Span was not in the running span list
}
if (sampledSpanStore != null) {
sampledSpanStore.considerForSampling(span);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ private RegisterSpanNameEvent(
public void process() {
sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
}

@Override
public void rejected() {}
}

@Override
Expand Down Expand Up @@ -341,6 +344,9 @@ private UnregisterSpanNameEvent(
public void process() {
sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
}

@Override
public void rejected() {}
}

@Override
Expand Down

0 comments on commit 6ff8ccd

Please sign in to comment.