diff --git a/Directory.Build.props b/Directory.Build.props
index 83111ee..c0d4184 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -19,13 +19,15 @@
-
-
-
-
+
+
+
+
- -->
+
+
+
diff --git a/README.md b/README.md
index d8163e8..f2957e9 100644
--- a/README.md
+++ b/README.md
@@ -25,6 +25,7 @@ Prerequisites:
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
+* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code.
diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln
index 86885a5..16fb50f 100644
--- a/TemporalioSamples.sln
+++ b/TemporalioSamples.sln
@@ -47,6 +47,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Dependenc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkerVersioning", "src\WorkerVersioning\TemporalioSamples.WorkerVersioning.csproj", "{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Mutex", "src\Mutex\TemporalioSamples.Mutex.csproj", "{3168FB2D-D821-433A-A761-309E0474DE48}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio", "..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj", "{D472F5AE-8C24-4FAB-A60C-667B7A8CDF75}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.Hosting", "..\temporal-sdk-dotnet\src\Temporalio.Extensions.Hosting\Temporalio.Extensions.Hosting.csproj", "{8B562D41-7525-42A1-928F-C21F1F22EF81}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Temporalio.Extensions.OpenTelemetry", "..\temporal-sdk-dotnet\src\Temporalio.Extensions.OpenTelemetry\Temporalio.Extensions.OpenTelemetry.csproj", "{40607923-C603-4CCB-9674-15894710D1AB}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -128,6 +136,22 @@ Global
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3168FB2D-D821-433A-A761-309E0474DE48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3168FB2D-D821-433A-A761-309E0474DE48}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3168FB2D-D821-433A-A761-309E0474DE48}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3168FB2D-D821-433A-A761-309E0474DE48}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D472F5AE-8C24-4FAB-A60C-667B7A8CDF75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D472F5AE-8C24-4FAB-A60C-667B7A8CDF75}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D472F5AE-8C24-4FAB-A60C-667B7A8CDF75}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D472F5AE-8C24-4FAB-A60C-667B7A8CDF75}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8B562D41-7525-42A1-928F-C21F1F22EF81}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8B562D41-7525-42A1-928F-C21F1F22EF81}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8B562D41-7525-42A1-928F-C21F1F22EF81}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8B562D41-7525-42A1-928F-C21F1F22EF81}.Release|Any CPU.Build.0 = Release|Any CPU
+ {40607923-C603-4CCB-9674-15894710D1AB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {40607923-C603-4CCB-9674-15894710D1AB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {40607923-C603-4CCB-9674-15894710D1AB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {40607923-C603-4CCB-9674-15894710D1AB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7AECC7C6-9A21-4B8A-84D9-AFC4F5840CAF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
@@ -150,5 +174,9 @@ Global
{11A5854B-EE6E-4752-9C46-F466503D853B} = {AE21E7F4-B114-4761-81B1-8FA63E9F6BB8}
{10E6F7C9-7F6C-4A8E-94A1-99C10F46BBA4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
+ {3168FB2D-D821-433A-A761-309E0474DE48} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
+ {D472F5AE-8C24-4FAB-A60C-667B7A8CDF75} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
+ {8B562D41-7525-42A1-928F-C21F1F22EF81} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
+ {40607923-C603-4CCB-9674-15894710D1AB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
diff --git a/src/Mutex/Activities.cs b/src/Mutex/Activities.cs
new file mode 100644
index 0000000..c50c3ca
--- /dev/null
+++ b/src/Mutex/Activities.cs
@@ -0,0 +1,32 @@
+using Temporalio.Activities;
+
+namespace TemporalioSamples.Mutex;
+
+public static class Activities
+{
+ [Activity]
+ public static void NotifyLocked(NotifyLockedInput input)
+ {
+ ActivityExecutionContext.Current.Logger.LogInformation(
+ "Lock for resource '{ResourceId}' acquired, release signal name '{ReleaseSignalName}'", input.ResourceId, input.ReleaseSignalName);
+ }
+
+ [Activity]
+ public static async Task UseApiThatCantBeCalledInParallelAsync(UseApiThatCantBeCalledInParallelInput input)
+ {
+ var logger = ActivityExecutionContext.Current.Logger;
+
+ logger.LogInformation("Sleeping for '{SleepFor}'...", input.SleepFor);
+
+ await Task.Delay(input.SleepFor);
+
+ logger.LogInformation("Done sleeping!");
+ }
+
+ [Activity]
+ public static void NotifyUnlocked(NotifyUnlockedInput input)
+ {
+ ActivityExecutionContext.Current.Logger.LogInformation(
+ "Lock for resource '{ResourceId}' released", input.ResourceId);
+ }
+}
diff --git a/src/Mutex/Impl/ILockHandle.cs b/src/Mutex/Impl/ILockHandle.cs
new file mode 100644
index 0000000..d95482a
--- /dev/null
+++ b/src/Mutex/Impl/ILockHandle.cs
@@ -0,0 +1,10 @@
+namespace TemporalioSamples.Mutex.Impl;
+
+public interface ILockHandle : IAsyncDisposable
+{
+ public string LockInitiatorId { get; }
+
+ public string ResourceId { get; }
+
+ public string ReleaseSignalName { get; }
+}
diff --git a/src/Mutex/Impl/Internal/AcquireLockInput.cs b/src/Mutex/Impl/Internal/AcquireLockInput.cs
new file mode 100644
index 0000000..b6d6e35
--- /dev/null
+++ b/src/Mutex/Impl/Internal/AcquireLockInput.cs
@@ -0,0 +1,3 @@
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal record AcquireLockInput(string ReleaseSignalName);
diff --git a/src/Mutex/Impl/Internal/ILockHandler.cs b/src/Mutex/Impl/Internal/ILockHandler.cs
new file mode 100644
index 0000000..88c01cc
--- /dev/null
+++ b/src/Mutex/Impl/Internal/ILockHandler.cs
@@ -0,0 +1,8 @@
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal interface ILockHandler
+{
+ public string? CurrentOwnerId { get; }
+
+ public Task HandleAsync(LockRequest lockRequest);
+}
diff --git a/src/Mutex/Impl/Internal/LockRequest.cs b/src/Mutex/Impl/Internal/LockRequest.cs
new file mode 100644
index 0000000..94a73a9
--- /dev/null
+++ b/src/Mutex/Impl/Internal/LockRequest.cs
@@ -0,0 +1,3 @@
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal record LockRequest(string InitiatorId, string ResourceId, string AcquireLockSignalName, TimeSpan? Timeout = null);
diff --git a/src/Mutex/Impl/Internal/MutexActivities.cs b/src/Mutex/Impl/Internal/MutexActivities.cs
new file mode 100644
index 0000000..8767482
--- /dev/null
+++ b/src/Mutex/Impl/Internal/MutexActivities.cs
@@ -0,0 +1,35 @@
+using Temporalio.Activities;
+using Temporalio.Client;
+using Temporalio.Workflows;
+
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal class MutexActivities
+{
+ private static readonly string RequestLockSignalName =
+ WorkflowSignalDefinition.FromMethod(
+ typeof(MutexWorkflow).GetMethod(nameof(MutexWorkflow.RequestLockAsync))
+ ?? throw new InvalidOperationException($"Method {nameof(MutexWorkflow.RequestLockAsync)} not found on type {typeof(MutexWorkflow)}"))
+ .Name ?? throw new InvalidOperationException("Signal name is null.");
+
+ private readonly ITemporalClient client;
+
+ public MutexActivities(ITemporalClient client)
+ {
+ this.client = client;
+ }
+
+ [Activity]
+ public async Task SignalWithStartMutexWorkflowAsync(SignalWithStartMutexWorkflowInput input)
+ {
+ var activityInfo = ActivityExecutionContext.Current.Info;
+
+ await this.client.StartWorkflowAsync(
+ (MutexWorkflow mw) => mw.RunAsync(MutexWorkflowInput.Empty),
+ new WorkflowOptions(input.MutexWorkflowId, activityInfo.TaskQueue)
+ {
+ StartSignal = RequestLockSignalName,
+ StartSignalArgs = new object[] { new LockRequest(activityInfo.WorkflowId, input.ResourceId, input.AcquireLockSignalName, input.LockTimeout), },
+ });
+ }
+}
diff --git a/src/Mutex/Impl/Internal/MutexWorkflow.workflow.cs b/src/Mutex/Impl/Internal/MutexWorkflow.workflow.cs
new file mode 100644
index 0000000..8d8c721
--- /dev/null
+++ b/src/Mutex/Impl/Internal/MutexWorkflow.workflow.cs
@@ -0,0 +1,55 @@
+using Temporalio.Workflows;
+
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+[Workflow]
+internal class MutexWorkflow
+{
+ private readonly ILockHandler lockHandler = WorkflowMutex.CreateLockHandler();
+ private readonly Queue requests = new();
+
+ [WorkflowRun]
+ public async Task RunAsync(MutexWorkflowInput input)
+ {
+ var logger = Workflow.Logger;
+
+ foreach (var request in input.InitialRequests)
+ {
+ requests.Enqueue(request);
+ }
+
+ while (!Workflow.ContinueAsNewSuggested)
+ {
+ if (requests.Count == 0)
+ {
+ logger.LogInformation("No lock requests, waiting for more...");
+
+ await Workflow.WaitConditionAsync(() => requests.Count > 0);
+ }
+
+ while (requests.TryDequeue(out var lockRequest))
+ {
+ await this.lockHandler.HandleAsync(lockRequest);
+ }
+ }
+
+ if (requests.Count > 0)
+ {
+ var newInput = new MutexWorkflowInput(requests);
+ throw Workflow.CreateContinueAsNewException((MutexWorkflow x) => x.RunAsync(newInput));
+ }
+ }
+
+ [WorkflowQuery]
+ public string? CurrentOwnerId => this.lockHandler.CurrentOwnerId;
+
+ [WorkflowSignal]
+ public Task RequestLockAsync(LockRequest request)
+ {
+ requests.Enqueue(request);
+
+ Workflow.Logger.LogInformation("Received lock request. (InitiatorId='{InitiatorId}')", request.InitiatorId);
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/src/Mutex/Impl/Internal/MutexWorkflowInput.cs b/src/Mutex/Impl/Internal/MutexWorkflowInput.cs
new file mode 100644
index 0000000..33b6fea
--- /dev/null
+++ b/src/Mutex/Impl/Internal/MutexWorkflowInput.cs
@@ -0,0 +1,6 @@
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal record MutexWorkflowInput(IReadOnlyCollection InitialRequests)
+{
+ public static readonly MutexWorkflowInput Empty = new(Array.Empty());
+}
diff --git a/src/Mutex/Impl/Internal/SignalWithStartMutexWorkflowInput.cs b/src/Mutex/Impl/Internal/SignalWithStartMutexWorkflowInput.cs
new file mode 100644
index 0000000..8820733
--- /dev/null
+++ b/src/Mutex/Impl/Internal/SignalWithStartMutexWorkflowInput.cs
@@ -0,0 +1,3 @@
+namespace TemporalioSamples.Mutex.Impl.Internal;
+
+internal record SignalWithStartMutexWorkflowInput(string MutexWorkflowId, string ResourceId, string AcquireLockSignalName, TimeSpan? LockTimeout = null);
diff --git a/src/Mutex/Impl/TemporalWorkerOptionsExtensions.cs b/src/Mutex/Impl/TemporalWorkerOptionsExtensions.cs
new file mode 100644
index 0000000..1a6e98a
--- /dev/null
+++ b/src/Mutex/Impl/TemporalWorkerOptionsExtensions.cs
@@ -0,0 +1,19 @@
+using Temporalio.Client;
+using Temporalio.Worker;
+using TemporalioSamples.Mutex.Impl.Internal;
+
+namespace TemporalioSamples.Mutex.Impl;
+
+public static class TemporalWorkerOptionsExtensions
+{
+ public static TemporalWorkerOptions AddWorkflowMutex(this TemporalWorkerOptions options, ITemporalClient client)
+ {
+ var mutexActivities = new MutexActivities(client);
+
+ options
+ .AddAllActivities(mutexActivities)
+ .AddWorkflow();
+
+ return options;
+ }
+}
diff --git a/src/Mutex/Impl/WorkflowMutex.cs b/src/Mutex/Impl/WorkflowMutex.cs
new file mode 100644
index 0000000..d8fc2ce
--- /dev/null
+++ b/src/Mutex/Impl/WorkflowMutex.cs
@@ -0,0 +1,130 @@
+using Temporalio.Workflows;
+using TemporalioSamples.Mutex.Impl.Internal;
+
+namespace TemporalioSamples.Mutex.Impl;
+
+///
+/// Represents a mutual exclusion mechanism for Workflows.
+/// This part contains API for acquiring locks.
+///
+public static class WorkflowMutex
+{
+ public static async Task LockAsync(string resourceId, TimeSpan? lockTimeout = null)
+ {
+ if (!Workflow.InWorkflow)
+ {
+ throw new InvalidOperationException("Cannot acquire a lock outside of a workflow.");
+ }
+
+ var initiatorId = Workflow.Info.WorkflowId;
+ var lockStarted = Workflow.UtcNow;
+
+ string? releaseSignalName = null;
+ var acquireLockSignalName = Workflow.NewGuid().ToString();
+ var signalDefinition = WorkflowSignalDefinition.CreateWithoutAttribute(acquireLockSignalName, (AcquireLockInput input) =>
+ {
+ releaseSignalName = input.ReleaseSignalName;
+
+ return Task.CompletedTask;
+ });
+ Workflow.Signals[acquireLockSignalName] = signalDefinition;
+ try
+ {
+ var startMutexWorkflowInput = new SignalWithStartMutexWorkflowInput($"__wm-lock:{resourceId}", resourceId, acquireLockSignalName, lockTimeout);
+ await Workflow.ExecuteActivityAsync(
+ act => act.SignalWithStartMutexWorkflowAsync(startMutexWorkflowInput),
+ new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1), });
+
+ await Workflow.WaitConditionAsync(() => releaseSignalName != null);
+
+ var elapsed = Workflow.UtcNow - lockStarted;
+ Workflow.Logger.LogInformation(
+ "Lock for resource '{ResourceId}' acquired in {AcquireTime}ms by '{LockInitiatorId}', release signal name '{ReleaseSignalName}'",
+ resourceId,
+ (int)elapsed.TotalMilliseconds,
+ initiatorId,
+ releaseSignalName);
+
+ return new LockHandle(initiatorId, startMutexWorkflowInput.MutexWorkflowId, resourceId, releaseSignalName!);
+ }
+ finally
+ {
+ Workflow.Signals.Remove(acquireLockSignalName);
+ }
+ }
+
+ internal static ILockHandler CreateLockHandler()
+ {
+ if (!Workflow.InWorkflow)
+ {
+ throw new InvalidOperationException("Cannot acquire a lock outside of a workflow.");
+ }
+
+ return new LockHandler();
+ }
+
+ internal sealed class LockHandle : ILockHandle
+ {
+ private readonly string mutexWorkflowId;
+
+ public LockHandle(string lockInitiatorId, string mutexWorkflowId, string resourceId, string releaseSignalId)
+ {
+ this.LockInitiatorId = lockInitiatorId;
+ this.mutexWorkflowId = mutexWorkflowId;
+ this.ResourceId = resourceId;
+ this.ReleaseSignalName = releaseSignalId;
+ }
+
+ ///
+ public string LockInitiatorId { get; }
+
+ ///
+ public string ResourceId { get; }
+
+ ///
+ public string ReleaseSignalName { get; }
+
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ var mutexHandle = Workflow.GetExternalWorkflowHandle(mutexWorkflowId);
+ await mutexHandle.SignalAsync(ReleaseSignalName, Array.Empty