Skip to content
This repository has been archived by the owner on Dec 23, 2023. It is now read-only.

Commit

Permalink
Add EventQueue.Entry#rejected() for cleanup logic to prevent leak (#1809
Browse files Browse the repository at this point in the history
)
  • Loading branch information
saiya committed Apr 11, 2019
1 parent 8b886b0 commit 43d7647
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -135,7 +136,7 @@ private static DisruptorEventQueue create() {
DisruptorEnqueuer enqueuer =
new DisruptorEnqueuer() {
@Override
public void enqueue(Entry entry) {
public boolean enqueue(Entry entry) {
long sequence;
try {
sequence = ringBuffer.tryNext();
Expand All @@ -147,14 +148,15 @@ public void enqueue(Entry entry) {
"Dropping some events due to queue overflow."
+ " Consider to reduce sampling rate.");
}
return;
return false;
}
try {
DisruptorEvent event = ringBuffer.get(sequence);
event.setEntry(entry);
} finally {
ringBuffer.publish(sequence);
}
return true;
}
};
return new DisruptorEventQueue(disruptor, enqueuer);
Expand All @@ -176,7 +178,9 @@ public static DisruptorEventQueue getInstance() {
*/
@Override
public void enqueue(Entry entry) {
enqueuer.enqueue(entry);
if (!enqueuer.enqueue(entry)) {
entry.rejected();
}
}

/** Shuts down the underlying disruptor. */
Expand All @@ -187,10 +191,11 @@ public void shutdown() {
final AtomicBoolean logged = new AtomicBoolean(false);

@Override
public void enqueue(Entry entry) {
public boolean enqueue(Entry entry) {
if (!logged.getAndSet(true)) {
logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
}
return false;
}
};

Expand All @@ -200,7 +205,14 @@ public void enqueue(Entry entry) {
// Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
private abstract static class DisruptorEnqueuer {

public abstract void enqueue(Entry entry);
/**
* Try to enqueue Entry into the queue.
*
* @param entry Entry to enqueue
* @return If enqueue failed, returns false.
*/
@CheckReturnValue
public abstract boolean enqueue(Entry entry);
}

// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void check(int value) {
private static class IncrementEvent implements EventQueue.Entry {

private final Counter counter;
private volatile boolean isRejected = false;

IncrementEvent(Counter counter) {
this.counter = counter;
Expand All @@ -88,6 +89,11 @@ private static class IncrementEvent implements EventQueue.Entry {
public void process() {
counter.increment();
}

@Override
public void rejected() {
isRejected = true;
}
}

@Test
Expand Down Expand Up @@ -147,6 +153,7 @@ public void shouldNotBlockWhenOverflow() {
// Queue event into filled queue to test overflow behavior
IncrementEvent ie = new IncrementEvent(counter);
DisruptorEventQueue.getInstance().enqueue(ie);
assertThat(ie.isRejected).isTrue();
assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1);

// Cleanup events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ interface Entry {
* associated {@link EventQueue}.
*/
void process();

/**
* Cleanup resources associated with this entry to prevent leak. This method is called when this
* event is rejected. Note that this method might be called in foreground (application) thread.
*/
void rejected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,8 @@ public void process() {
// Add Timestamp to value after it went through the DisruptorQueue.
statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now());
}

@Override
public void rejected() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,20 @@ private static final class SpanStartEvent implements EventQueue.Entry {
@Override
public void process() {
if (activeSpansExporter != null) {
// Strictly speaking, SpanEndEvent#rejected() can be called before
// SpanStartEvent#process() in rare case.
// Because SpanStartEvent#process() is called via queue.
// In such case, span stay in activeSpansExporter forever because
// onEnd(span) is called before onStart(span).
// If we really need to prevent such case, should check span.hasBeenEnded here.
// The hasBeenEnded flag is always true in such case, so that we can skip
// onStart(span) call.
activeSpansExporter.onStart(span);
}
}

@Override
public void rejected() {}
}

// An EventQueue entry that records the end of the span event.
Expand All @@ -116,14 +127,29 @@ public void process() {
if (span.getContext().getTraceOptions().isSampled()) {
spanExporter.addSpan(span);
}
removeFromRunningSpanStore();
if (sampledSpanStore != null) {
sampledSpanStore.considerForSampling(span);
}
}

@Override
public void rejected() {
// Should remove from runningSpanStore to prevent memory leak
removeFromRunningSpanStore();
}

private void removeFromRunningSpanStore() {
if (runningSpanStore != null) {
try {
// Note that corresponding SpanStartEvent / onStart(span) might not have called if queue was full.
// Note that corresponding SpanStartEvent / onStart(span) might not have called if queue
// was full.
runningSpanStore.onEnd(span);
} catch (IllegalArgumentException e) {} // Span was not in the running span list
}
if (sampledSpanStore != null) {
sampledSpanStore.considerForSampling(span);
} catch (IllegalArgumentException e) {
// Span was not in the running span list.
// This case is caused by queue overflow.
// Intentionally avoided logging to prevent blocking caused by massive log output
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void onStart(RecordEventsSpanImpl span) {

@Override
public void onEnd(RecordEventsSpanImpl span) {
runningSpans.removeElement(span); // Throws IllegalArgumentException if given span is not in the list
// Throws IllegalArgumentException if given span is not in the list
runningSpans.removeElement(span);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ private RegisterSpanNameEvent(
public void process() {
sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
}

@Override
public void rejected() {}
}

@Override
Expand Down Expand Up @@ -341,6 +344,9 @@ private UnregisterSpanNameEvent(
public void process() {
sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
}

@Override
public void rejected() {}
}

@Override
Expand Down

0 comments on commit 43d7647

Please sign in to comment.