Skip to content

Commit

Permalink
Ensure queue start working methods are shut down properly.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Mar 20, 2024
1 parent be0d1fd commit eb3d940
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
34 changes: 26 additions & 8 deletions src/Foundatio.TestHarness/Queue/QueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ public virtual Task VerifyDelayedRetryAttemptsAsync()

private async Task VerifyRetryAttemptsImplAsync(IQueue<SimpleWorkItem> queue, int retryCount, TimeSpan waitTime)
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));

try
{
await queue.DeleteQueueAsync();
Expand All @@ -367,6 +369,7 @@ private async Task VerifyRetryAttemptsImplAsync(IQueue<SimpleWorkItem> queue, in

var countdown = new AsyncCountdownEvent(retryCount + 1);
int attempts = 0;

await queue.StartWorkingAsync(async w =>
{
Interlocked.Increment(ref attempts);
Expand All @@ -380,7 +383,7 @@ await queue.StartWorkingAsync(async w =>
countdown.Signal();

_logger.LogInformation("Finished Attempt {Attempt} to work on queue item, Metadata Attempts: {MetadataAttempts}", attempts, queueEntryMetadata.Attempts);
});
}, cancellationToken: cancellationTokenSource.Token);

await queue.EnqueueAsync(new SimpleWorkItem
{
Expand Down Expand Up @@ -410,6 +413,7 @@ await queue.EnqueueAsync(new SimpleWorkItem
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand Down Expand Up @@ -716,6 +720,7 @@ public virtual async Task CanUseQueueWorkerAsync()
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand All @@ -727,7 +732,7 @@ await queue.StartWorkingAsync(async w =>
Assert.Equal("Hello", w.Value.Data);
await w.CompleteAsync();
resetEvent.Set();
});
}, cancellationToken: cancellationTokenSource.Token);

await queue.EnqueueAsync(new SimpleWorkItem
{
Expand All @@ -745,6 +750,7 @@ await queue.EnqueueAsync(new SimpleWorkItem
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand All @@ -755,6 +761,7 @@ public virtual async Task CanHandleErrorInWorkerAsync()
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand All @@ -770,7 +777,7 @@ await queue.StartWorkingAsync(w =>
_logger.LogDebug("WorkAction");
Assert.Equal("Hello", w.Value.Data);
throw new Exception();
});
}, cancellationToken: cancellationTokenSource.Token);

var resetEvent = new AsyncManualResetEvent(false);
using (queue.Abandoned.AddSyncHandler((o, args) => resetEvent.Set()))
Expand All @@ -793,6 +800,7 @@ await queue.StartWorkingAsync(w =>
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand Down Expand Up @@ -899,6 +907,7 @@ public virtual async Task CanAutoCompleteWorkerAsync()
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand All @@ -909,7 +918,7 @@ await queue.StartWorkingAsync(w =>
{
Assert.Equal("Hello", w.Value.Data);
return Task.CompletedTask;
}, true);
}, true, cancellationTokenSource.Token);

using (queue.Completed.AddSyncHandler((s, e) => { resetEvent.Set(); }))
{
Expand All @@ -929,6 +938,7 @@ await queue.StartWorkingAsync(w =>
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand All @@ -939,6 +949,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync()
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand All @@ -956,7 +967,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync()
{
var q = GetQueue(retries: 0, retryDelay: TimeSpan.Zero);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue Id: {Id}, I: {Instance}", q.QueueId, i);
await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info));
await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info), cancellationToken: cancellationTokenSource.Token);
workers.Add(q);
}

Expand Down Expand Up @@ -1011,6 +1022,7 @@ await Run.InParallelAsync(workItemCount, async i =>
}
finally
{
await cancellationTokenSource.CancelAsync();
foreach (var q in workers)
await CleanupQueueAsync(q);
}
Expand Down Expand Up @@ -1316,6 +1328,7 @@ protected async Task CanDequeueWithLockingImpAsync(CacheLockProvider distributed
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand All @@ -1340,7 +1353,7 @@ await queue.StartWorkingAsync(async w =>

await w.CompleteAsync();
resetEvent.Set();
});
}, cancellationToken: cancellationTokenSource.Token);

await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" });
await resetEvent.WaitAsync(TimeSpan.FromSeconds(5));
Expand All @@ -1355,6 +1368,7 @@ await queue.StartWorkingAsync(async w =>
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand All @@ -1374,6 +1388,7 @@ protected async Task CanHaveMultipleQueueInstancesWithLockingImplAsync(CacheLock
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand Down Expand Up @@ -1409,7 +1424,7 @@ await q.StartWorkingAsync(async w =>
countdown.Signal();
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("[{Instance}] Signaled countdown: {Id}", instanceCount, w.Id);
});
}, cancellationToken: cancellationTokenSource.Token);
workers.Add(q);
}

Expand Down Expand Up @@ -1453,6 +1468,7 @@ await Run.InParallelAsync(workItemCount, async i =>
}
finally
{
await cancellationTokenSource.CancelAsync();
foreach (var q in workers)
await CleanupQueueAsync(q);
}
Expand Down Expand Up @@ -1571,6 +1587,7 @@ public virtual async Task CanHandleAutoAbandonInWorker()
if (queue == null)
return;

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await queue.DeleteQueueAsync();
Expand Down Expand Up @@ -1607,7 +1624,7 @@ await queue.StartWorkingAsync(async (item) =>
}

successEvent.Set();
});
}, cancellationToken: cancellationTokenSource.Token);

await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" });
await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" });
Expand All @@ -1617,6 +1634,7 @@ await queue.StartWorkingAsync(async (item) =>
}
finally
{
await cancellationTokenSource.CancelAsync();
await CleanupQueueAsync(queue);
}
}
Expand Down
6 changes: 4 additions & 2 deletions tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Queues;
using Foundatio.Utility;
Expand Down Expand Up @@ -385,6 +386,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239()
{
// create queue with short work item timeout so it will be auto abandoned
var queue = new InMemoryQueue_Issue239<SimpleWorkItem>(Log);
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));

// completion source to wait for CompleteAsync call before the assert
var taskCompletionSource = new TaskCompletionSource<bool>();
Expand All @@ -409,7 +411,7 @@ await queue.StartWorkingAsync(async (item) =>
// infrastructure handles user exception incorrectly
taskCompletionSource.SetResult(true);
}
});
}, cancellationToken: cancellationTokenSource.Token);

// enqueue item which will be processed after it's auto abandoned
await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" });
Expand Down

0 comments on commit eb3d940

Please sign in to comment.