From 31f08ba4c81634347b93fafa4660912fb15b87ee Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Wed, 1 May 2024 10:37:29 +1000 Subject: [PATCH 1/2] Avoid leaking task/cancellation objects when no events are passing through the sink --- .../PeriodicBatching/PeriodicBatchingSink.cs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index a14c0e6..055d369 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -49,6 +49,7 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable readonly FailureAwareBatchScheduler _batchScheduler; readonly Queue _currentBatch = new(); readonly Task _waitForShutdownSignal; + Task? _cachedWaitToRead; /// /// Construct a . @@ -205,7 +206,10 @@ async Task LoopAsync() // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) { - var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()).ConfigureAwait(false); + var waitToRead = _cachedWaitToRead ?? reader.WaitToReadAsync(cancellationToken).AsTask(); + _cachedWaitToRead = null; + + var completed = await Task.WhenAny(timeout, waitToRead).ConfigureAwait(false); // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing // read task cancellation exceptions during shutdown, may be some room to improve. @@ -213,9 +217,18 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout { WriteToSelfLog($"could not read from queue: {completed.Exception}"); } + + if (completed == timeout) + { + // Dropping references to `waitToRead` will cause it and some supporting objects to leak; disposing it + // will break the channel and cause future attempts to read to fail. So, we cache and reuse it next time + // around the loop. - // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. - return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false }; + _cachedWaitToRead = waitToRead; + return false; + } + + return await waitToRead; } /// From 393e4ad29122e64ee5cfed620576f5bdd20a29cd Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Wed, 1 May 2024 11:24:14 +1000 Subject: [PATCH 2/2] Defensive check to make sure OperationCanceledException is not thrown; try to make test more robust on slow machines --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 3 +++ .../PeriodicBatchingSinkTests.cs | 11 +++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 055d369..80ce29a 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -228,6 +228,9 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout return false; } + if (waitToRead.Status is not TaskStatus.RanToCompletion) + return false; + return await waitToRead; } diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs index b3cbbf5..e753a22 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs @@ -115,17 +115,16 @@ public void ExecutionContextDoesNotFlowToBatchedSink() [InlineData(false)] public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) { - long batchesEmitted = 0; + var x = new ManualResetEvent(false); var bs = new CallbackBatchedSink(_ => { - // ReSharper disable once AccessToModifiedClosure - Interlocked.Increment(ref batchesEmitted); + x.Set(); return Task.CompletedTask; }); var options = new PeriodicBatchingSinkOptions { - Period = TimeSpan.FromSeconds(2), + Period = TimeSpan.FromSeconds(3), EagerlyEmitFirstEvent = eagerlyEmit, BatchSizeLimit = 10, QueueLimit = 1000 @@ -137,7 +136,7 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) pbs.Emit(evt); await Task.Delay(1900); - Assert.Equal(eagerlyEmit ? 1L : 0, Interlocked.Read(ref batchesEmitted)); + Assert.Equal(eagerlyEmit, x.WaitOne(0)); #if FEATURE_ASYNCDISPOSABLE await pbs.DisposeAsync(); @@ -145,6 +144,6 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) pbs.Dispose(); #endif - Assert.Equal(1L, Interlocked.Read(ref batchesEmitted)); + Assert.True(x.WaitOne(0)); } } \ No newline at end of file