diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 6ac96b2..fc04155 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -49,6 +49,7 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent readonly FailureAwareBatchScheduler _batchScheduler; readonly Queue _currentBatch = new(); readonly Task _waitForShutdownSignal; + Task? _cachedWaitToRead; /// /// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited. @@ -260,7 +261,10 @@ async Task LoopAsync() // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) { - var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()).ConfigureAwait(false); + var waitToRead = _cachedWaitToRead ?? reader.WaitToReadAsync(cancellationToken).AsTask(); + _cachedWaitToRead = null; + + var completed = await Task.WhenAny(timeout, waitToRead).ConfigureAwait(false); // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing // read task cancellation exceptions during shutdown, may be some room to improve. @@ -268,9 +272,21 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout { WriteToSelfLog($"could not read from queue: {completed.Exception}"); } + + if (completed == timeout) + { + // Dropping references to `waitToRead` will cause it and some supporting objects to leak; disposing it + // will break the channel and cause future attempts to read to fail. So, we cache and reuse it next time + // around the loop. - // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. - return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false }; + _cachedWaitToRead = waitToRead; + return false; + } + + if (waitToRead.Status is not TaskStatus.RanToCompletion) + return false; + + return await waitToRead; } /// diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs index b3cbbf5..e753a22 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs @@ -115,17 +115,16 @@ public void ExecutionContextDoesNotFlowToBatchedSink() [InlineData(false)] public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) { - long batchesEmitted = 0; + var x = new ManualResetEvent(false); var bs = new CallbackBatchedSink(_ => { - // ReSharper disable once AccessToModifiedClosure - Interlocked.Increment(ref batchesEmitted); + x.Set(); return Task.CompletedTask; }); var options = new PeriodicBatchingSinkOptions { - Period = TimeSpan.FromSeconds(2), + Period = TimeSpan.FromSeconds(3), EagerlyEmitFirstEvent = eagerlyEmit, BatchSizeLimit = 10, QueueLimit = 1000 @@ -137,7 +136,7 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) pbs.Emit(evt); await Task.Delay(1900); - Assert.Equal(eagerlyEmit ? 1L : 0, Interlocked.Read(ref batchesEmitted)); + Assert.Equal(eagerlyEmit, x.WaitOne(0)); #if FEATURE_ASYNCDISPOSABLE await pbs.DisposeAsync(); @@ -145,6 +144,6 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit) pbs.Dispose(); #endif - Assert.Equal(1L, Interlocked.Read(ref batchesEmitted)); + Assert.True(x.WaitOne(0)); } } \ No newline at end of file