-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathMutexWorkflow.workflow.cs
60 lines (47 loc) · 1.65 KB
/
MutexWorkflow.workflow.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
namespace TemporalioSamples.Mutex.Impl;
using Temporalio.Workflows;
internal record MutexWorkflowInput(IReadOnlyCollection<LockRequest> InitialRequests)
{
public static readonly MutexWorkflowInput Empty = new(Array.Empty<LockRequest>());
}
[Workflow]
internal class MutexWorkflow
{
private readonly ILockHandler lockHandler = WorkflowMutex.CreateLockHandler();
private readonly Queue<LockRequest> requests = new();
[WorkflowRun]
public async Task RunAsync(MutexWorkflowInput input)
{
var logger = Workflow.Logger;
foreach (var request in input.InitialRequests)
{
requests.Enqueue(request);
}
while (!Workflow.ContinueAsNewSuggested)
{
if (requests.Count == 0)
{
logger.LogInformation("No lock requests, waiting for more...");
await Workflow.WaitConditionAsync(() => requests.Count > 0);
}
while (requests.TryDequeue(out var lockRequest))
{
await lockHandler.HandleAsync(lockRequest);
}
}
if (requests.Count > 0)
{
var newInput = new MutexWorkflowInput(requests);
throw Workflow.CreateContinueAsNewException((MutexWorkflow x) => x.RunAsync(newInput));
}
}
[WorkflowQuery]
public string? CurrentOwnerId => lockHandler.CurrentOwnerId;
[WorkflowSignal]
public Task RequestLockAsync(LockRequest request)
{
requests.Enqueue(request);
Workflow.Logger.LogInformation("Received lock request. (InitiatorId='{InitiatorId}')", request.InitiatorId);
return Task.CompletedTask;
}
}