Skip to content
This repository has been archived by the owner on Dec 23, 2023. It is now read-only.

Commit

Permalink
Remove running span even also in rare case to prevent memory leak (#1809
Browse files Browse the repository at this point in the history
)
  • Loading branch information
saiya committed May 16, 2019
1 parent 43d7647 commit a116593
Showing 1 changed file with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opencensus.implcore.trace.export.SpanExporterImpl;
import io.opencensus.trace.Span.Options;
import io.opencensus.trace.export.SpanData;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -39,6 +40,8 @@ public final class StartEndHandlerImpl implements StartEndHandler {
// true if any of (runningSpanStore OR sampledSpanStore) are different than null, which
// means the spans with RECORD_EVENTS should be enqueued in the queue.
private final boolean enqueueEventForNonSampledSpans;
private final EventCancellationTokenStore eventCancellationTokenStore =
new EventCancellationTokenStore();

/**
* Constructs a new {@code StartEndHandlerImpl}.
Expand All @@ -63,40 +66,52 @@ public StartEndHandlerImpl(
@Override
public void onStart(RecordEventsSpanImpl span) {
if (span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans) {
eventQueue.enqueue(new SpanStartEvent(span, runningSpanStore));
eventQueue.enqueue(new SpanStartEvent(span, runningSpanStore, eventCancellationTokenStore));
}
}

@Override
public void onEnd(RecordEventsSpanImpl span) {
if ((span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans)
|| span.getContext().getTraceOptions().isSampled()) {
eventQueue.enqueue(new SpanEndEvent(span, spanExporter, runningSpanStore, sampledSpanStore));
eventQueue.enqueue(
new SpanEndEvent(
span, spanExporter, runningSpanStore, sampledSpanStore, eventCancellationTokenStore));
}
}

// An EventQueue entry that records the start of the span event.
private static final class SpanStartEvent implements EventQueue.Entry {
private final RecordEventsSpanImpl span;
@Nullable private final RunningSpanStoreImpl activeSpansExporter;
private final EventCancellationTokenStore eventCancellationTokenStore;
private final long eventCancellationToken;

SpanStartEvent(RecordEventsSpanImpl span, @Nullable RunningSpanStoreImpl activeSpansExporter) {
SpanStartEvent(
RecordEventsSpanImpl span,
@Nullable RunningSpanStoreImpl activeSpansExporter,
EventCancellationTokenStore eventCancellationTokenStore) {
this.span = span;
this.activeSpansExporter = activeSpansExporter;
this.eventCancellationTokenStore = eventCancellationTokenStore;
this.eventCancellationToken = eventCancellationTokenStore.generateCancellationToken();
}

@Override
public void process() {
if (activeSpansExporter != null) {
// Strictly speaking, SpanEndEvent#rejected() can be called before
// SpanStartEvent#process() in rare case.
// Because SpanStartEvent#process() is called via queue.
// In such case, span stay in activeSpansExporter forever because
// onEnd(span) is called before onStart(span).
// If we really need to prevent such case, should check span.hasBeenEnded here.
// The hasBeenEnded flag is always true in such case, so that we can skip
// onStart(span) call.
if (activeSpansExporter != null
&& !eventCancellationTokenStore.isCanceled(eventCancellationToken)) {
// Note #1: At this moment, SpanEndEvent#rejected might call
// eventCancellationTokenStore.cancelEvents + runningSpanStore.onEnd
activeSpansExporter.onStart(span);

if (eventCancellationTokenStore.isCanceled(eventCancellationToken)) {
// If SpanEndEvent#rejected called eventCancellationTokenStore.cancelEvents at moment #1,
// should close activeSpan.
// Because SpanEndEvent#rejected might had called runningSpanStore.onEnd before
// activeSpansExporter.onStart
activeSpansExporter.onEnd(span);
}
}
}

Expand All @@ -110,16 +125,19 @@ private static final class SpanEndEvent implements EventQueue.Entry {
@Nullable private final RunningSpanStoreImpl runningSpanStore;
private final SpanExporterImpl spanExporter;
@Nullable private final SampledSpanStoreImpl sampledSpanStore;
private final EventCancellationTokenStore eventCancellationTokenStore;

SpanEndEvent(
RecordEventsSpanImpl span,
SpanExporterImpl spanExporter,
@Nullable RunningSpanStoreImpl runningSpanStore,
@Nullable SampledSpanStoreImpl sampledSpanStore) {
@Nullable SampledSpanStoreImpl sampledSpanStore,
EventCancellationTokenStore eventCancellationTokenStore) {
this.span = span;
this.runningSpanStore = runningSpanStore;
this.spanExporter = spanExporter;
this.sampledSpanStore = sampledSpanStore;
this.eventCancellationTokenStore = eventCancellationTokenStore;
}

@Override
Expand All @@ -135,6 +153,8 @@ public void process() {

@Override
public void rejected() {
eventCancellationTokenStore.cancelEvents();

// Should remove from runningSpanStore to prevent memory leak
removeFromRunningSpanStore();
}
Expand All @@ -153,4 +173,21 @@ private void removeFromRunningSpanStore() {
}
}
}

private static final class EventCancellationTokenStore {
private final AtomicLong sequence = new AtomicLong(0);
private volatile long canceledAt = -1;

long generateCancellationToken() {
return sequence.getAndIncrement();
}

boolean isCanceled(long token) {
return token <= this.canceledAt;
}

void cancelEvents() {
this.canceledAt = this.sequence.get();
}
}
}

0 comments on commit a116593

Please sign in to comment.