diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java index 9abea915b6..da57df2e51 100644 --- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java +++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -135,7 +136,7 @@ private static DisruptorEventQueue create() { DisruptorEnqueuer enqueuer = new DisruptorEnqueuer() { @Override - public void enqueue(Entry entry) { + public boolean enqueue(Entry entry) { long sequence; try { sequence = ringBuffer.tryNext(); @@ -147,7 +148,7 @@ public void enqueue(Entry entry) { "Dropping some events due to queue overflow." + " Consider to reduce sampling rate."); } - return; + return false; } try { DisruptorEvent event = ringBuffer.get(sequence); @@ -155,6 +156,7 @@ public void enqueue(Entry entry) { } finally { ringBuffer.publish(sequence); } + return true; } }; return new DisruptorEventQueue(disruptor, enqueuer); @@ -176,7 +178,9 @@ public static DisruptorEventQueue getInstance() { */ @Override public void enqueue(Entry entry) { - enqueuer.enqueue(entry); + if (!enqueuer.enqueue(entry)) { + entry.rejected(); + } } /** Shuts down the underlying disruptor. */ @@ -187,10 +191,11 @@ public void shutdown() { final AtomicBoolean logged = new AtomicBoolean(false); @Override - public void enqueue(Entry entry) { + public boolean enqueue(Entry entry) { if (!logged.getAndSet(true)) { logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown."); } + return false; } }; @@ -200,7 +205,14 @@ public void enqueue(Entry entry) { // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer private abstract static class DisruptorEnqueuer { - public abstract void enqueue(Entry entry); + /** + * Try to enqueue Entry into the queue. + * + * @param entry Entry to enqueue + * @return If enqueue failed, returns false. + */ + @CheckReturnValue + public abstract boolean enqueue(Entry entry); } // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. diff --git a/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java b/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java index 2107717cd3..d6fcb994a5 100644 --- a/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java +++ b/impl/src/test/java/io/opencensus/impl/internal/DisruptorEventQueueTest.java @@ -79,6 +79,7 @@ public void check(int value) { private static class IncrementEvent implements EventQueue.Entry { private final Counter counter; + private volatile boolean isRejected = false; IncrementEvent(Counter counter) { this.counter = counter; @@ -88,6 +89,11 @@ private static class IncrementEvent implements EventQueue.Entry { public void process() { counter.increment(); } + + @Override + public void rejected() { + isRejected = true; + } } @Test @@ -147,6 +153,7 @@ public void shouldNotBlockWhenOverflow() { // Queue event into filled queue to test overflow behavior IncrementEvent ie = new IncrementEvent(counter); DisruptorEventQueue.getInstance().enqueue(ie); + assertThat(ie.isRejected).isTrue(); assertThat(DisruptorEventQueue.overflowCount.get()).isEqualTo(overflowCountBefore + 1); // Cleanup events diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java index 6eb1149a92..dbb2378920 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java @@ -32,5 +32,10 @@ interface Entry { * associated {@link EventQueue}. */ void process(); + + /** + * + */ + void rejected(); } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java index 17e99d4644..d2d7fb9bc3 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java @@ -100,5 +100,8 @@ public void process() { // Add Timestamp to value after it went through the DisruptorQueue. statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now()); } + + @Override + public void rejected() {} } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java index 5fe0f85e4c..b5524a18f5 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java @@ -91,6 +91,9 @@ public void process() { activeSpansExporter.onStart(span); } } + + @Override + public void rejected() {} } // An EventQueue entry that records the end of the span event. @@ -116,15 +119,25 @@ public void process() { if (span.getContext().getTraceOptions().isSampled()) { spanExporter.addSpan(span); } + removeFromRunningSpanStore(); + if (sampledSpanStore != null) { + sampledSpanStore.considerForSampling(span); + } + } + + @Override + public void rejected() { + // Should remove from runningSpanStore to prevent memory leak + removeFromRunningSpanStore(); + } + + private void removeFromRunningSpanStore() { if (runningSpanStore != null) { try { // Note that corresponding SpanStartEvent / onStart(span) might not have called if queue was full. runningSpanStore.onEnd(span); } catch (IllegalArgumentException e) {} // Span was not in the running span list } - if (sampledSpanStore != null) { - sampledSpanStore.considerForSampling(span); - } } } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java index a3f16cddbc..1a8328fb23 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java @@ -313,6 +313,9 @@ private RegisterSpanNameEvent( public void process() { sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames); } + + @Override + public void rejected() {} } @Override @@ -341,6 +344,9 @@ private UnregisterSpanNameEvent( public void process() { sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames); } + + @Override + public void rejected() {} } @Override