Skip to content

Commit

Permalink
Add MetricsInterval to queues to control how often queue metrics are …
Browse files Browse the repository at this point in the history
…retrieved
  • Loading branch information
ejsmith committed Dec 18, 2024
1 parent 38efe12 commit 6c77408
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/Foundatio.TestHarness/Queue/QueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ public virtual async Task CanRunWorkItemWithMetricsAsync()
{
int completedCount = 0;

using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log));
using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log).MetricsInterval(TimeSpan.Zero));

Task Handler(object sender, CompletedEventArgs<WorkItemData> e)
{
Expand Down
15 changes: 11 additions & 4 deletions src/Foundatio/Queues/QueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public abstract class QueueBase<T, TOptions> : MaintenanceBase, IQueue<T>, IHave
private readonly List<IQueueBehavior<T>> _behaviors = new();
protected readonly CancellationTokenSource _queueDisposedCancellationTokenSource;
private bool _isDisposed;
private QueueStats _queueStats;
private DateTime _nextQueueStatsUpdate = DateTime.MinValue;

protected QueueBase(TOptions options) : base(options?.TimeProvider, options?.LoggerFactory)
{
Expand All @@ -62,11 +64,15 @@ protected QueueBase(TOptions options) : base(options?.TimeProvider, options?.Log

var queueMetricValues = new InstrumentsValues<long, long, long>(() =>
{
if (options.MetricsInterval > TimeSpan.Zero && _nextQueueStatsUpdate >= _timeProvider.GetUtcNow())
return (_queueStats.Queued, _queueStats.Working, _queueStats.Deadletter);

_nextQueueStatsUpdate = _timeProvider.GetUtcNow().UtcDateTime.Add(_options.MetricsInterval);
try
{
using var _ = FoundatioDiagnostics.ActivitySource.StartActivity("Queue Stats: " + _options.Name);
var stats = GetMetricsQueueStats();
return (stats.Queued, stats.Working, stats.Deadletter);
_queueStats = GetMetricsQueueStats();
return (_queueStats.Queued, _queueStats.Working, _queueStats.Deadletter);
}
catch
{
Expand Down Expand Up @@ -138,9 +144,10 @@ public async Task<IEnumerable<T>> GetDeadletterItemsAsync(CancellationToken canc

protected abstract Task<QueueStats> GetQueueStatsImplAsync();

public Task<QueueStats> GetQueueStatsAsync()
public async Task<QueueStats> GetQueueStatsAsync()
{
return GetQueueStatsImplAsync();
_queueStats = await GetQueueStatsImplAsync();
return _queueStats;
}

protected virtual QueueStats GetMetricsQueueStats()
Expand Down
17 changes: 17 additions & 0 deletions src/Foundatio/Queues/SharedQueueOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public class SharedQueueOptions<T> : SharedOptions where T : class
/// Allows you to set a prefix on queue metrics. This allows you to have unique metrics for keyed queues (e.g., priority queues).
/// </summary>
public string MetricsPrefix { get; set; }

/// <summary>
/// How often to update queue metrics. Defaults to 30 seconds.
/// </summary>
public TimeSpan MetricsInterval { get; set; } = TimeSpan.FromSeconds(30);
}

public class SharedQueueOptionsBuilder<T, TOptions, TBuilder> : SharedOptionsBuilder<TOptions, TBuilder>
Expand Down Expand Up @@ -77,4 +82,16 @@ public TBuilder MetricsPrefix(string prefix)
Target.MetricsPrefix = prefix.Trim();
return (TBuilder)this;
}

/// <summary>
/// How often to update queue metrics. Defaults to 30 seconds.
/// </summary>
public TBuilder MetricsInterval(TimeSpan interval)
{
if (interval < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(interval));

Target.MetricsInterval = interval;
return (TBuilder)this;
}
}
1 change: 1 addition & 0 deletions tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ protected override IQueue<SimpleWorkItem> GetQueue(int retries = 1, TimeSpan? wo
.RetryMultipliers(retryMultipliers ?? new[] { 1, 3, 5, 10 })
.WorkItemTimeout(workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5)))
.TimeProvider(timeProvider)
.MetricsInterval(TimeSpan.Zero)
.LoggerFactory(Log));
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Queue Id: {QueueId}", _queue.QueueId);
Expand Down

0 comments on commit 6c77408

Please sign in to comment.