diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index dd36c6bf..66ad6dba 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -358,6 +358,8 @@ public virtual Task VerifyDelayedRetryAttemptsAsync() private async Task VerifyRetryAttemptsImplAsync(IQueue queue, int retryCount, TimeSpan waitTime) { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + try { await queue.DeleteQueueAsync(); @@ -367,6 +369,7 @@ private async Task VerifyRetryAttemptsImplAsync(IQueue queue, in var countdown = new AsyncCountdownEvent(retryCount + 1); int attempts = 0; + await queue.StartWorkingAsync(async w => { Interlocked.Increment(ref attempts); @@ -380,7 +383,7 @@ await queue.StartWorkingAsync(async w => countdown.Signal(); _logger.LogInformation("Finished Attempt {Attempt} to work on queue item, Metadata Attempts: {MetadataAttempts}", attempts, queueEntryMetadata.Attempts); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -410,6 +413,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -716,6 +720,7 @@ public virtual async Task CanUseQueueWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -727,7 +732,7 @@ await queue.StartWorkingAsync(async w => Assert.Equal("Hello", w.Value.Data); await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -745,6 +750,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -755,6 +761,7 @@ public virtual async Task CanHandleErrorInWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -770,7 +777,7 @@ await queue.StartWorkingAsync(w => _logger.LogDebug("WorkAction"); Assert.Equal("Hello", w.Value.Data); throw new Exception(); - }); + }, cancellationToken: cancellationTokenSource.Token); var resetEvent = new AsyncManualResetEvent(false); using (queue.Abandoned.AddSyncHandler((o, args) => resetEvent.Set())) @@ -793,6 +800,7 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -899,6 +907,7 @@ public virtual async Task CanAutoCompleteWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -909,7 +918,7 @@ await queue.StartWorkingAsync(w => { Assert.Equal("Hello", w.Value.Data); return Task.CompletedTask; - }, true); + }, true, cancellationTokenSource.Token); using (queue.Completed.AddSyncHandler((s, e) => { resetEvent.Set(); })) { @@ -929,6 +938,7 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -939,6 +949,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -956,7 +967,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() { var q = GetQueue(retries: 0, retryDelay: TimeSpan.Zero); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue Id: {Id}, I: {Instance}", q.QueueId, i); - await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info)); + await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info), cancellationToken: cancellationTokenSource.Token); workers.Add(q); } @@ -1011,6 +1022,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1316,6 +1328,7 @@ protected async Task CanDequeueWithLockingImpAsync(CacheLockProvider distributed if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1340,7 +1353,7 @@ await queue.StartWorkingAsync(async w => await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" }); await resetEvent.WaitAsync(TimeSpan.FromSeconds(5)); @@ -1355,6 +1368,7 @@ await queue.StartWorkingAsync(async w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -1374,6 +1388,7 @@ protected async Task CanHaveMultipleQueueInstancesWithLockingImplAsync(CacheLock if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1409,7 +1424,7 @@ await q.StartWorkingAsync(async w => countdown.Signal(); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Signaled countdown: {Id}", instanceCount, w.Id); - }); + }, cancellationToken: cancellationTokenSource.Token); workers.Add(q); } @@ -1453,6 +1468,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1571,6 +1587,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1607,7 +1624,7 @@ await queue.StartWorkingAsync(async (item) => } successEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" }); @@ -1617,6 +1634,7 @@ await queue.StartWorkingAsync(async (item) => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index e3d339f2..7f15b561 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -1,6 +1,7 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Foundatio.Queues; using Foundatio.Utility; @@ -385,6 +386,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239() { // create queue with short work item timeout so it will be auto abandoned var queue = new InMemoryQueue_Issue239(Log); + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // completion source to wait for CompleteAsync call before the assert var taskCompletionSource = new TaskCompletionSource(); @@ -409,7 +411,7 @@ await queue.StartWorkingAsync(async (item) => // infrastructure handles user exception incorrectly taskCompletionSource.SetResult(true); } - }); + }, cancellationToken: cancellationTokenSource.Token); // enqueue item which will be processed after it's auto abandoned await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" });