Skip to content

Commit

Permalink
Merge pull request #78 from serilog/dev
Browse files Browse the repository at this point in the history
4.1.0 Release
  • Loading branch information
nblumhardt authored May 3, 2024
2 parents 87a2ac3 + 9bb1f1c commit d7b0406
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<Description>Buffer batches of log events to be flushed asynchronously.</Description>
<VersionPrefix>4.0.1</VersionPrefix>
<VersionPrefix>4.1.0</VersionPrefix>
<Authors>Serilog Contributors</Authors>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">net462</TargetFrameworks>
<TargetFrameworks>$(TargetFrameworks);netstandard2.0;net6.0</TargetFrameworks>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Serilog.Sinks.PeriodicBatching;
/// <summary>
/// Buffers log events into batches for background flushing.
/// </summary>
public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEventSink
#if FEATURE_ASYNCDISPOSABLE
, IAsyncDisposable
#endif
Expand All @@ -43,12 +43,19 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
readonly Task _runLoop;

// Used only by the read side.
readonly IBatchedLogEventSink _targetSink;
readonly IBatchedLogEventSink _targetSink = null!;
readonly int _batchSizeLimit;
readonly bool _eagerlyEmitFirstEvent;
readonly FailureAwareBatchScheduler _batchScheduler;
readonly Queue<LogEvent> _currentBatch = new();
readonly Task _waitForShutdownSignal;
Task<bool>? _cachedWaitToRead;

/// <summary>
/// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited.
/// </summary>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
public const int NoQueueLimit = -1;

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>.
Expand All @@ -58,14 +65,62 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
/// it will dispose this object if possible.</param>
/// <param name="options">Options controlling behavior of the sink.</param>
public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSinkOptions options)
: this(options)
{
_targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
}

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true,
QueueLimit = null
})
{
_targetSink = this;
}

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
/// <param name="queueLimit">Maximum number of events in the queue - use <see cref="NoQueueLimit"/> for an unbounded queue.</param>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLimit)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true,
QueueLimit = queueLimit == NoQueueLimit ? null : queueLimit
})
{
_targetSink = this;
}

PeriodicBatchingSink(PeriodicBatchingSinkOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (options.BatchSizeLimit <= 0)
throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero.");
if (options.Period <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero.");

_targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
_batchSizeLimit = options.BatchSizeLimit;
_queue = options.QueueLimit is { } limit
? Channel.CreateBounded<LogEvent>(new BoundedChannelOptions(limit) { SingleReader = true })
Expand Down Expand Up @@ -117,7 +172,8 @@ async Task LoopAsync()
{
while (_currentBatch.Count < _batchSizeLimit &&
!_shutdownSignal.IsCancellationRequested &&
_queue.Reader.TryRead(out var next))
_queue.Reader.TryRead(out var next) &&
CanInclude(next))
{
_currentBatch.Enqueue(next);
}
Expand Down Expand Up @@ -205,22 +261,77 @@ async Task LoopAsync()
// Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled.
async Task<bool> TryWaitToReadAsync(ChannelReader<LogEvent> reader, Task timeout, CancellationToken cancellationToken)
{
var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()).ConfigureAwait(false);
var waitToRead = _cachedWaitToRead ?? reader.WaitToReadAsync(cancellationToken).AsTask();
_cachedWaitToRead = null;

var completed = await Task.WhenAny(timeout, waitToRead).ConfigureAwait(false);

// Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing
// read task cancellation exceptions during shutdown, may be some room to improve.
if (completed is { Exception: not null, IsCanceled: false })
{
WriteToSelfLog($"could not read from queue: {completed.Exception}");
}

if (completed == timeout)
{
// Dropping references to `waitToRead` will cause it and some supporting objects to leak; disposing it
// will break the channel and cause future attempts to read to fail. So, we cache and reuse it next time
// around the loop.

// `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework.
return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false };
_cachedWaitToRead = waitToRead;
return false;
}

if (waitToRead.Status is not TaskStatus.RanToCompletion)
return false;

return await waitToRead;
}

/// <inheritdoc/>
/// <summary>
/// Emit a batch of log events, running to completion synchronously.
/// </summary>
/// <param name="events">The events to emit.</param>
/// <remarks>Override either <see cref="EmitBatch"/> or <see cref="EmitBatchAsync"/>,
/// not both.</remarks>
protected virtual void EmitBatch(IEnumerable<LogEvent> events)
{
}

/// <summary>
/// Emit a batch of log events, running asynchronously.
/// </summary>
/// <param name="events">The events to emit.</param>
/// <remarks>Override either <see cref="EmitBatchAsync"/> or <see cref="EmitBatch"/>,
/// not both. </remarks>
#pragma warning disable 1998
protected virtual async Task EmitBatchAsync(IEnumerable<LogEvent> events)
#pragma warning restore 1998
{
// ReSharper disable once MethodHasAsyncOverload
EmitBatch(events);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Free resources held by the sink.
/// </summary>
/// <param name="disposing">If true, called because the object is being disposed; if false,
/// the object is being disposed from the finalizer.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;

SignalShutdown();

try
Expand All @@ -234,6 +345,12 @@ public void Dispose()
WriteToSelfLog("caught exception during disposal", ex);
}

if (ReferenceEquals(_targetSink, this))
{
// The sink is being used in the obsolete inheritance-based mode.
return;
}

(_targetSink as IDisposable)?.Dispose();
}

Expand All @@ -254,10 +371,21 @@ public async ValueTask DisposeAsync()
WriteToSelfLog("caught exception during async disposal", ex);
}

if (ReferenceEquals(_targetSink, this))
{
// The sink is being used in the obsolete inheritance-based mode. Old sinks won't
// override something like `DisposeAsyncCore()`; we just forward to the synchronous
// `Dispose()` method to ensure whatever cleanup they do still occurs.
Dispose(true);
return;
}

if (_targetSink is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
else
(_targetSink as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
#endif

Expand All @@ -280,4 +408,43 @@ void WriteToSelfLog(string message, Exception? exception = null)
var ex = exception != null ? $"{Environment.NewLine}{exception}" : "";
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): {message}{ex}");
}

/// <summary>
/// Determine whether a queued log event should be included in the batch. If
/// an override returns false, the event will be dropped.
/// </summary>
/// <param name="logEvent">An event to test for inclusion.</param>
/// <returns>True if the event should be included in the batch; otherwise, false.</returns>
// ReSharper disable once UnusedParameter.Global
protected virtual bool CanInclude(LogEvent logEvent)
{
return true;
}

/// <summary>
/// Allows derived sinks to perform periodic work without requiring additional threads
/// or timers (thus avoiding additional flush/shut-down complexity).
/// </summary>
/// <remarks>Override either <see cref="OnEmptyBatch"/> or <see cref="OnEmptyBatchAsync"/>,
/// not both. </remarks>
protected virtual void OnEmptyBatch()
{
}

/// <summary>
/// Allows derived sinks to perform periodic work without requiring additional threads
/// or timers (thus avoiding additional flush/shut-down complexity).
/// </summary>
/// <remarks>Override either <see cref="OnEmptyBatchAsync"/> or <see cref="OnEmptyBatch"/>,
/// not both. </remarks>
#pragma warning disable 1998
protected virtual async Task OnEmptyBatchAsync()
#pragma warning restore 1998
{
// ReSharper disable once MethodHasAsyncOverload
OnEmptyBatch();
}

Task IBatchedLogEventSink.EmitBatchAsync(IEnumerable<LogEvent> batch) => EmitBatchAsync(batch);
Task IBatchedLogEventSink.OnEmptyBatchAsync() => OnEmptyBatchAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#if FEATURE_ASYNCDISPOSABLE

using Serilog.Sinks.PeriodicBatching.Tests.Support;
using Serilog.Tests.Support;
using Xunit;

namespace Serilog.Sinks.PeriodicBatching.Tests;

public class BackwardsCompatibilityTests
{
static readonly TimeSpan TinyWait = TimeSpan.FromMilliseconds(200);

[Fact]
public void LegacyWhenAnEventIsEnqueuedItIsWrittenToABatchOnDispose()
{
var bs = new LegacyDisposeTrackingSink();
var evt = Some.InformationEvent();
bs.Emit(evt);
bs.Dispose();
var batch = Assert.Single(bs.Batches);
var batched = Assert.Single(batch);
Assert.Same(evt, batched);
Assert.True(bs.IsDisposed);
Assert.False(bs.WasCalledAfterDisposal);
}

[Fact]
public void LegacyWhenAnEventIsEnqueuedItIsWrittenToABatchOnTimer()
{
var bs = new LegacyDisposeTrackingSink();
var evt = Some.InformationEvent();
bs.Emit(evt);
Thread.Sleep(TinyWait + TinyWait);
bs.Stop();
bs.Dispose();
Assert.Single(bs.Batches);
Assert.True(bs.IsDisposed);
Assert.False(bs.WasCalledAfterDisposal);
}

[Fact]
public void LegacySinksAreDisposedWhenLoggerIsDisposed()
{
var sink = new LegacyDisposeTrackingSink();
var logger = new LoggerConfiguration().WriteTo.Sink(sink).CreateLogger();
logger.Dispose();
Assert.True(sink.IsDisposed);
}

[Fact]
public async Task LegacySinksAreDisposedWhenLoggerIsDisposedAsync()
{
var sink = new LegacyDisposeTrackingSink();
var logger = new LoggerConfiguration().WriteTo.Sink(sink).CreateLogger();
await logger.DisposeAsync();
Assert.True(sink.IsDisposed);
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,16 @@ public void ExecutionContextDoesNotFlowToBatchedSink()
[InlineData(false)]
public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit)
{
long batchesEmitted = 0;
var x = new ManualResetEvent(false);
var bs = new CallbackBatchedSink(_ =>
{
// ReSharper disable once AccessToModifiedClosure
Interlocked.Increment(ref batchesEmitted);
x.Set();
return Task.CompletedTask;
});

var options = new PeriodicBatchingSinkOptions
{
Period = TimeSpan.FromSeconds(2),
Period = TimeSpan.FromSeconds(3),
EagerlyEmitFirstEvent = eagerlyEmit,
BatchSizeLimit = 10,
QueueLimit = 1000
Expand All @@ -137,14 +136,14 @@ public async Task EagerlyEmitFirstEventCausesQuickInitialBatch(bool eagerlyEmit)
pbs.Emit(evt);

await Task.Delay(1900);
Assert.Equal(eagerlyEmit ? 1L : 0, Interlocked.Read(ref batchesEmitted));
Assert.Equal(eagerlyEmit, x.WaitOne(0));

#if FEATURE_ASYNCDISPOSABLE
await pbs.DisposeAsync();
#else
pbs.Dispose();
#endif

Assert.Equal(1L, Interlocked.Read(ref batchesEmitted));
Assert.True(x.WaitOne(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Task EmitBatchAsync(IEnumerable<LogEvent> events)
lock (_stateLock)
{
if (_stopped)
return Task.FromResult(0);
return Task.CompletedTask;

if (IsDisposed)
WasCalledAfterDisposal = true;
Expand All @@ -37,10 +37,10 @@ public Task EmitBatchAsync(IEnumerable<LogEvent> events)
Batches.Add(events.ToList());
}

return Task.FromResult(0);
return Task.CompletedTask;
}

public Task OnEmptyBatchAsync() => Task.FromResult(0);
public Task OnEmptyBatchAsync() => Task.CompletedTask;

public void Dispose()
{
Expand Down
Loading

0 comments on commit d7b0406

Please sign in to comment.