Skip to content

Commit

Permalink
Merge pull request #75 from nblumhardt/reinstate-inheritance-api
Browse files Browse the repository at this point in the history
Reinstate the legacy inheritance-based API - again
  • Loading branch information
nblumhardt authored May 1, 2024
2 parents 54daef1 + 6df936a commit 9bb1f1c
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<Description>Buffer batches of log events to be flushed asynchronously.</Description>
<VersionPrefix>4.0.2</VersionPrefix>
<VersionPrefix>4.1.0</VersionPrefix>
<Authors>Serilog Contributors</Authors>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">net462</TargetFrameworks>
<TargetFrameworks>$(TargetFrameworks);netstandard2.0;net6.0</TargetFrameworks>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Serilog.Sinks.PeriodicBatching;
/// <summary>
/// Buffers log events into batches for background flushing.
/// </summary>
public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEventSink
#if FEATURE_ASYNCDISPOSABLE
, IAsyncDisposable
#endif
Expand All @@ -43,13 +43,19 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
readonly Task _runLoop;

// Used only by the read side.
readonly IBatchedLogEventSink _targetSink;
readonly IBatchedLogEventSink _targetSink = null!;
readonly int _batchSizeLimit;
readonly bool _eagerlyEmitFirstEvent;
readonly FailureAwareBatchScheduler _batchScheduler;
readonly Queue<LogEvent> _currentBatch = new();
readonly Task _waitForShutdownSignal;
Task<bool>? _cachedWaitToRead;

/// <summary>
/// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited.
/// </summary>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
public const int NoQueueLimit = -1;

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>.
Expand All @@ -59,14 +65,62 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable
/// it will dispose this object if possible.</param>
/// <param name="options">Options controlling behavior of the sink.</param>
public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSinkOptions options)
: this(options)
{
_targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
}

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true,
QueueLimit = null
})
{
_targetSink = this;
}

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
/// <param name="queueLimit">Maximum number of events in the queue - use <see cref="NoQueueLimit"/> for an unbounded queue.</param>
[Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")]
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLimit)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true,
QueueLimit = queueLimit == NoQueueLimit ? null : queueLimit
})
{
_targetSink = this;
}

PeriodicBatchingSink(PeriodicBatchingSinkOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (options.BatchSizeLimit <= 0)
throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero.");
if (options.Period <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero.");

_targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
_batchSizeLimit = options.BatchSizeLimit;
_queue = options.QueueLimit is { } limit
? Channel.CreateBounded<LogEvent>(new BoundedChannelOptions(limit) { SingleReader = true })
Expand Down Expand Up @@ -118,7 +172,8 @@ async Task LoopAsync()
{
while (_currentBatch.Count < _batchSizeLimit &&
!_shutdownSignal.IsCancellationRequested &&
_queue.Reader.TryRead(out var next))
_queue.Reader.TryRead(out var next) &&
CanInclude(next))
{
_currentBatch.Enqueue(next);
}
Expand Down Expand Up @@ -234,9 +289,49 @@ async Task<bool> TryWaitToReadAsync(ChannelReader<LogEvent> reader, Task timeout
return await waitToRead;
}

/// <inheritdoc/>
/// <summary>
/// Emit a batch of log events, running to completion synchronously.
/// </summary>
/// <param name="events">The events to emit.</param>
/// <remarks>Override either <see cref="EmitBatch"/> or <see cref="EmitBatchAsync"/>,
/// not both.</remarks>
protected virtual void EmitBatch(IEnumerable<LogEvent> events)
{
}

/// <summary>
/// Emit a batch of log events, running asynchronously.
/// </summary>
/// <param name="events">The events to emit.</param>
/// <remarks>Override either <see cref="EmitBatchAsync"/> or <see cref="EmitBatch"/>,
/// not both. </remarks>
#pragma warning disable 1998
protected virtual async Task EmitBatchAsync(IEnumerable<LogEvent> events)
#pragma warning restore 1998
{
// ReSharper disable once MethodHasAsyncOverload
EmitBatch(events);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Free resources held by the sink.
/// </summary>
/// <param name="disposing">If true, called because the object is being disposed; if false,
/// the object is being disposed from the finalizer.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;

SignalShutdown();

try
Expand All @@ -250,6 +345,12 @@ public void Dispose()
WriteToSelfLog("caught exception during disposal", ex);
}

if (ReferenceEquals(_targetSink, this))
{
// The sink is being used in the obsolete inheritance-based mode.
return;
}

(_targetSink as IDisposable)?.Dispose();
}

Expand All @@ -270,10 +371,21 @@ public async ValueTask DisposeAsync()
WriteToSelfLog("caught exception during async disposal", ex);
}

if (ReferenceEquals(_targetSink, this))
{
// The sink is being used in the obsolete inheritance-based mode. Old sinks won't
// override something like `DisposeAsyncCore()`; we just forward to the synchronous
// `Dispose()` method to ensure whatever cleanup they do still occurs.
Dispose(true);
return;
}

if (_targetSink is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
else
(_targetSink as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
#endif

Expand All @@ -296,4 +408,43 @@ void WriteToSelfLog(string message, Exception? exception = null)
var ex = exception != null ? $"{Environment.NewLine}{exception}" : "";
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): {message}{ex}");
}

/// <summary>
/// Determine whether a queued log event should be included in the batch. If
/// an override returns false, the event will be dropped.
/// </summary>
/// <param name="logEvent">An event to test for inclusion.</param>
/// <returns>True if the event should be included in the batch; otherwise, false.</returns>
// ReSharper disable once UnusedParameter.Global
protected virtual bool CanInclude(LogEvent logEvent)
{
return true;
}

/// <summary>
/// Allows derived sinks to perform periodic work without requiring additional threads
/// or timers (thus avoiding additional flush/shut-down complexity).
/// </summary>
/// <remarks>Override either <see cref="OnEmptyBatch"/> or <see cref="OnEmptyBatchAsync"/>,
/// not both. </remarks>
protected virtual void OnEmptyBatch()
{
}

/// <summary>
/// Allows derived sinks to perform periodic work without requiring additional threads
/// or timers (thus avoiding additional flush/shut-down complexity).
/// </summary>
/// <remarks>Override either <see cref="OnEmptyBatchAsync"/> or <see cref="OnEmptyBatch"/>,
/// not both. </remarks>
#pragma warning disable 1998
protected virtual async Task OnEmptyBatchAsync()
#pragma warning restore 1998
{
// ReSharper disable once MethodHasAsyncOverload
OnEmptyBatch();
}

Task IBatchedLogEventSink.EmitBatchAsync(IEnumerable<LogEvent> batch) => EmitBatchAsync(batch);
Task IBatchedLogEventSink.OnEmptyBatchAsync() => OnEmptyBatchAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#if FEATURE_ASYNCDISPOSABLE

using Serilog.Sinks.PeriodicBatching.Tests.Support;
using Serilog.Tests.Support;
using Xunit;

namespace Serilog.Sinks.PeriodicBatching.Tests;

public class BackwardsCompatibilityTests
{
static readonly TimeSpan TinyWait = TimeSpan.FromMilliseconds(200);

[Fact]
public void LegacyWhenAnEventIsEnqueuedItIsWrittenToABatchOnDispose()
{
var bs = new LegacyDisposeTrackingSink();
var evt = Some.InformationEvent();
bs.Emit(evt);
bs.Dispose();
var batch = Assert.Single(bs.Batches);
var batched = Assert.Single(batch);
Assert.Same(evt, batched);
Assert.True(bs.IsDisposed);
Assert.False(bs.WasCalledAfterDisposal);
}

[Fact]
public void LegacyWhenAnEventIsEnqueuedItIsWrittenToABatchOnTimer()
{
var bs = new LegacyDisposeTrackingSink();
var evt = Some.InformationEvent();
bs.Emit(evt);
Thread.Sleep(TinyWait + TinyWait);
bs.Stop();
bs.Dispose();
Assert.Single(bs.Batches);
Assert.True(bs.IsDisposed);
Assert.False(bs.WasCalledAfterDisposal);
}

[Fact]
public void LegacySinksAreDisposedWhenLoggerIsDisposed()
{
var sink = new LegacyDisposeTrackingSink();
var logger = new LoggerConfiguration().WriteTo.Sink(sink).CreateLogger();
logger.Dispose();
Assert.True(sink.IsDisposed);
}

[Fact]
public async Task LegacySinksAreDisposedWhenLoggerIsDisposedAsync()
{
var sink = new LegacyDisposeTrackingSink();
var logger = new LoggerConfiguration().WriteTo.Sink(sink).CreateLogger();
await logger.DisposeAsync();
Assert.True(sink.IsDisposed);
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Task EmitBatchAsync(IEnumerable<LogEvent> events)
lock (_stateLock)
{
if (_stopped)
return Task.FromResult(0);
return Task.CompletedTask;

if (IsDisposed)
WasCalledAfterDisposal = true;
Expand All @@ -37,10 +37,10 @@ public Task EmitBatchAsync(IEnumerable<LogEvent> events)
Batches.Add(events.ToList());
}

return Task.FromResult(0);
return Task.CompletedTask;
}

public Task OnEmptyBatchAsync() => Task.FromResult(0);
public Task OnEmptyBatchAsync() => Task.CompletedTask;

public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Serilog.Events;

#pragma warning disable CS0618

namespace Serilog.Sinks.PeriodicBatching.Tests.Support;

public class LegacyDisposeTrackingSink()
: PeriodicBatchingSink(10, TimeSpan.FromMinutes(1))
{
readonly object _stateLock = new();
bool _stopped;

// Postmortem only
public bool WasCalledAfterDisposal { get; private set; }
public bool IsDisposed { get; private set; }
public IList<IList<LogEvent>> Batches { get; } = new List<IList<LogEvent>>();

public void Stop()
{
lock (_stateLock)
{
_stopped = true;
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

lock (_stateLock)
{
IsDisposed = true;
}
}

protected override void EmitBatch(IEnumerable<LogEvent> events)
{
lock (_stateLock)
{
if (_stopped)
return;

if (IsDisposed)
WasCalledAfterDisposal = true;

Batches.Add(events.ToList());
}
}
}

0 comments on commit 9bb1f1c

Please sign in to comment.