Skip to content
This repository has been archived by the owner on Dec 23, 2023. It is now read-only.

Commit

Permalink
Remove running span even also in rare case to prevent memory leak (#1809
Browse files Browse the repository at this point in the history
)
  • Loading branch information
saiya committed May 16, 2019
1 parent 04fd00b commit a55ccbc
Showing 1 changed file with 55 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opencensus.implcore.trace.export.SpanExporterImpl;
import io.opencensus.trace.Span.Options;
import io.opencensus.trace.export.SpanData;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -36,6 +37,8 @@ public final class StartEndHandlerImpl implements StartEndHandler {
private final InProcessRunningSpanStore inProcessRunningSpanStore;
private final SampledSpanStoreImpl sampledSpanStore;
private final EventQueue eventQueue;
private final EventCancellationTokenStore eventCancellationTokenStore =
new EventCancellationTokenStore();

/**
* Constructs a new {@code StartEndHandlerImpl}.
Expand All @@ -60,7 +63,8 @@ public StartEndHandlerImpl(
public void onStart(RecordEventsSpanImpl span) {
if (span.getOptions().contains(Options.RECORD_EVENTS)
&& inProcessRunningSpanStore.getEnabled()) {
eventQueue.enqueue(new SpanStartEvent(span, inProcessRunningSpanStore));
eventQueue.enqueue(
new SpanStartEvent(span, inProcessRunningSpanStore, eventCancellationTokenStore));
}
}

Expand All @@ -70,31 +74,47 @@ public void onEnd(RecordEventsSpanImpl span) {
&& (inProcessRunningSpanStore.getEnabled() || sampledSpanStore.getEnabled()))
|| span.getContext().getTraceOptions().isSampled()) {
eventQueue.enqueue(
new SpanEndEvent(span, spanExporter, inProcessRunningSpanStore, sampledSpanStore));
new SpanEndEvent(
span,
spanExporter,
inProcessRunningSpanStore,
sampledSpanStore,
eventCancellationTokenStore));
}
}

// An EventQueue entry that records the start of the span event.
private static final class SpanStartEvent implements EventQueue.Entry {
private final RecordEventsSpanImpl span;
private final InProcessRunningSpanStore inProcessRunningSpanStore;
private final EventCancellationTokenStore eventCancellationTokenStore;
private final long eventCancellationToken;

SpanStartEvent(RecordEventsSpanImpl span, InProcessRunningSpanStore inProcessRunningSpanStore) {
SpanStartEvent(
RecordEventsSpanImpl span,
InProcessRunningSpanStore inProcessRunningSpanStore,
EventCancellationTokenStore eventCancellationTokenStore) {
this.span = span;
this.inProcessRunningSpanStore = inProcessRunningSpanStore;
this.eventCancellationTokenStore = eventCancellationTokenStore;
this.eventCancellationToken = eventCancellationTokenStore.generateCancellationToken();
}

@Override
public void process() {
// Strictly speaking, SpanEndEvent#rejected() can be called before
// SpanStartEvent#process() in rare case.
// Because SpanStartEvent#process() is called via queue.
// In such case, span stay in activeSpansExporter forever because
// onEnd(span) is called before onStart(span).
// If we really need to prevent such case, should check span.hasBeenEnded here.
// The hasBeenEnded flag is always true in such case, so that we can skip
// onStart(span) call.
inProcessRunningSpanStore.onStart(span);
if (!eventCancellationTokenStore.isCanceled(eventCancellationToken)) {
// Note #1: At this moment, SpanEndEvent#rejected might call
// eventCancellationTokenStore.cancelEvents + runningSpanStore.onEnd
inProcessRunningSpanStore.onStart(span);

if (eventCancellationTokenStore.isCanceled(eventCancellationToken)) {
// If SpanEndEvent#rejected called eventCancellationTokenStore.cancelEvents at moment #1,
// should close activeSpan.
// Because SpanEndEvent#rejected might had called runningSpanStore.onEnd before
// activeSpansExporter.onStart
inProcessRunningSpanStore.onEnd(span);
}
}
}

@Override
Expand All @@ -107,16 +127,19 @@ private static final class SpanEndEvent implements EventQueue.Entry {
private final InProcessRunningSpanStore inProcessRunningSpanStore;
private final SpanExporterImpl spanExporter;
@Nullable private final SampledSpanStoreImpl sampledSpanStore;
private final EventCancellationTokenStore eventCancellationTokenStore;

SpanEndEvent(
RecordEventsSpanImpl span,
SpanExporterImpl spanExporter,
InProcessRunningSpanStore inProcessRunningSpanStore,
@Nullable SampledSpanStoreImpl sampledSpanStore) {
@Nullable SampledSpanStoreImpl sampledSpanStore,
EventCancellationTokenStore eventCancellationTokenStore) {
this.span = span;
this.inProcessRunningSpanStore = inProcessRunningSpanStore;
this.spanExporter = spanExporter;
this.sampledSpanStore = sampledSpanStore;
this.eventCancellationTokenStore = eventCancellationTokenStore;
}

@Override
Expand All @@ -134,8 +157,27 @@ public void process() {

@Override
public void rejected() {
eventCancellationTokenStore.cancelEvents();

// Should remove from runningSpanStore to prevent memory leak
inProcessRunningSpanStore.onEnd(span);
}
}

private static final class EventCancellationTokenStore {
private final AtomicLong sequence = new AtomicLong(0);
private volatile long canceledAt = -1;

long generateCancellationToken() {
return sequence.getAndIncrement();
}

boolean isCanceled(long token) {
return token <= this.canceledAt;
}

void cancelEvents() {
this.canceledAt = this.sequence.get();
}
}
}

0 comments on commit a55ccbc

Please sign in to comment.