Skip to content

Commit

Permalink
Merge branch 'dev' into reinstate-inheritance-api
Browse files Browse the repository at this point in the history
  • Loading branch information
nblumhardt authored May 1, 2024
2 parents c45cc64 + 54daef1 commit 6df936a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent
readonly FailureAwareBatchScheduler _batchScheduler;
readonly Queue<LogEvent> _currentBatch = new();
readonly Task _waitForShutdownSignal;
Task<bool>? _cachedWaitToRead;

/// <summary>
/// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited.
Expand Down Expand Up @@ -260,17 +261,32 @@ async Task LoopAsync()
// Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled.
async Task<bool> TryWaitToReadAsync(ChannelReader<LogEvent> reader, Task timeout, CancellationToken cancellationToken)
{
var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()).ConfigureAwait(false);
var waitToRead = _cachedWaitToRead ?? reader.WaitToReadAsync(cancellationToken).AsTask();
_cachedWaitToRead = null;

var completed = await Task.WhenAny(timeout, waitToRead).ConfigureAwait(false);

// Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing
// read task cancellation exceptions during shutdown, may be some room to improve.
if (completed is { Exception: not null, IsCanceled: false })
{
WriteToSelfLog($"could not read from queue: {completed.Exception}");
}

if (completed == timeout)
{
// Dropping references to `waitToRead` will cause it and some supporting objects to leak; disposing it
// will break the channel and cause future attempts to read to fail. So, we cache and reuse it next time
// around the loop.

// `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework.
return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false };
_cachedWaitToRead = waitToRead;
return false;
}

if (waitToRead.Status is not TaskStatus.RanToCompletion)
return false;

return await waitToRead;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,16 @@ public void ExecutionContextDoesNotFlowToBatchedSink()
[InlineData(false)]
public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit)
{
long batchesEmitted = 0;
var x = new ManualResetEvent(false);
var bs = new CallbackBatchedSink(_ =>
{
// ReSharper disable once AccessToModifiedClosure
Interlocked.Increment(ref batchesEmitted);
x.Set();
return Task.CompletedTask;
});

var options = new PeriodicBatchingSinkOptions
{
Period = TimeSpan.FromSeconds(2),
Period = TimeSpan.FromSeconds(3),
EagerlyEmitFirstEvent = eagerlyEmit,
BatchSizeLimit = 10,
QueueLimit = 1000
Expand All @@ -137,14 +136,14 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit)
pbs.Emit(evt);

await Task.Delay(1900);
Assert.Equal(eagerlyEmit ? 1L : 0, Interlocked.Read(ref batchesEmitted));
Assert.Equal(eagerlyEmit, x.WaitOne(0));

#if FEATURE_ASYNCDISPOSABLE
await pbs.DisposeAsync();
#else
pbs.Dispose();
#endif

Assert.Equal(1L, Interlocked.Read(ref batchesEmitted));
Assert.True(x.WaitOne(0));
}
}

0 comments on commit 6df936a

Please sign in to comment.