diff --git a/ProtoActor.sln b/ProtoActor.sln
index 72445911ca..cf661607a2 100644
--- a/ProtoActor.sln
+++ b/ProtoActor.sln
@@ -307,6 +307,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08
examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml = examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml
EndProjectSection
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -1481,6 +1483,18 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x64.Build.0 = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.ActiveCfg = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.Build.0 = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.Build.0 = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.Build.0 = Debug|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.ActiveCfg = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -1616,6 +1630,7 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183}
+ {B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
diff --git a/benchmarks/EndpointManagerTest/Dockerfile b/benchmarks/EndpointManagerTest/Dockerfile
new file mode 100644
index 0000000000..ed6318e6e1
--- /dev/null
+++ b/benchmarks/EndpointManagerTest/Dockerfile
@@ -0,0 +1,23 @@
+FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
+USER $APP_UID
+WORKDIR /app
+
+FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
+ARG BUILD_CONFIGURATION=Release
+WORKDIR /src
+COPY ["benchmarks/EndpointManagerTest/EndpointManagerTest.csproj", "benchmarks/EndpointManagerTest/"]
+COPY ["src/Proto.Actor/Proto.Actor.csproj", "src/Proto.Actor/"]
+COPY ["src/Proto.Remote/Proto.Remote.csproj", "src/Proto.Remote/"]
+RUN dotnet restore "benchmarks/EndpointManagerTest/EndpointManagerTest.csproj"
+COPY . .
+WORKDIR "/src/benchmarks/EndpointManagerTest"
+RUN dotnet build "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/build
+
+FROM build AS publish
+ARG BUILD_CONFIGURATION=Release
+RUN dotnet publish "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
+
+FROM base AS final
+WORKDIR /app
+COPY --from=publish /app/publish .
+ENTRYPOINT ["dotnet", "EndpointManagerTest.dll"]
diff --git a/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj
new file mode 100644
index 0000000000..27ae7a51bf
--- /dev/null
+++ b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ Linux
+
+
+
+
+
+
+
+
+
+ .dockerignore
+
+
+
+
diff --git a/benchmarks/EndpointManagerTest/Program.cs b/benchmarks/EndpointManagerTest/Program.cs
new file mode 100644
index 0000000000..710c4a95b2
--- /dev/null
+++ b/benchmarks/EndpointManagerTest/Program.cs
@@ -0,0 +1,117 @@
+using System.Diagnostics;
+using Microsoft.Extensions.Logging;
+using Proto;
+using Proto.Remote;
+using Proto.Remote.GrpcNet;
+
+namespace EndpointManagerTest;
+
+// This program tests lockup issues with the EndpointManager.
+// TL:DR; This is to demonstrate the issue with the locking and blocking waits in EndpointManager, and to confirm the fix.
+//
+// This recreates a scenario we were seeing in our production environments.
+// What we saw was 30 cluster clients were sending many messages to 2 of the cluster members, who were sending messages to eachother depending
+// on actor placement. If something happens and the 2 members had to reboot, they would end up locking up, not being able to do anything.
+// This scenario has been recreated more simply here, where you have 2 members sending many messages back and forth, a disconnect comes through
+// from a member that recently restarted, and new connections are being opened to other members. Putting all these together, we end up in a situation
+// where many threads get stuck at a lock in EndpointManager, while the one thread inside of the lock is waiting for a ServerConnector to stop.
+// NOTE: that this can be a bit flakey as we are trying to reproduce a complete thread lockup. So there is a dockerfile to run it in a more consistent
+// environment. Using `--cpus="1"` with docker will make it even more consistent, but sometimes it takes a few tries to repro.
+// You will know you reproduced it when you stop seeing "This should log every second." every second. you may also see the built in
+// "ThreadPool is running hot" log, but the absence of that log is ambiguous, since if it's locked up it won't finish to log how long it took!
+// The other indicator is that all the new connections made at the end should be logging terminations and reconnects and quickly give up (since they don't exist),
+// but of course that won't be happening when you're locked up. Also seeing any "terminating" messages without a corresponding "terminated" message
+// also indicates that you're locked up.
+class Program
+{
+ private static async Task Main()
+ {
+ Log.SetLoggerFactory(
+ LoggerFactory.Create(
+ c =>
+ c.SetMinimumLevel(LogLevel.Debug)
+ .AddFilter("Microsoft", LogLevel.None)
+ .AddFilter("Grpc", LogLevel.None)
+ .AddFilter("Proto.Context.ActorContext", LogLevel.Information)
+ .AddFilter("Proto.Diagnostics.DiagnosticsStore", LogLevel.Warning)
+ .AddFilter("Proto.Remote.ServerConnector", LogLevel.Error)
+ .AddSimpleConsole(o => o.SingleLine = true)
+ )
+ );
+
+ var logger = Log.CreateLogger("Main");
+
+ _ = Task.Factory.StartNew(async () =>
+ {
+ while (true)
+ {
+ try
+ {
+ await Task.Factory.StartNew(async () => { await Task.Yield(); });
+ }
+ catch (Exception)
+ {
+ }
+
+ logger.LogInformation("This should log every second [pending: {pendingWorkItems}].", ThreadPool.PendingWorkItemCount);
+ await Task.Delay(1000);
+ }
+ });
+
+ var sys1 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12000).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
+ await sys1.Remote().StartAsync();
+
+ var sys2 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12001).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
+ await sys2.Remote().StartAsync();
+
+ var echoActorOn2 = (await sys1.Remote().SpawnAsync("localhost:12001", "noop", TimeSpan.FromSeconds(1))).Pid;
+ _ = Task.Factory.StartNew(async () =>
+ {
+ while (true)
+ {
+ for (var i = 0; i < 200; i++)
+ {
+ _ = sys1.Root.RequestAsync(echoActorOn2, new Touch());
+ }
+ await Task.Yield();
+ }
+ });
+
+ var echoActorOn1 = (await sys2.Remote().SpawnAsync("localhost:12000", "noop", TimeSpan.FromSeconds(1))).Pid;
+ _ = Task.Factory.StartNew(async () =>
+ {
+ while (true)
+ {
+ for (var i = 0; i < 200; i++)
+ {
+ _ = sys2.Root.RequestAsync(echoActorOn1, new Touch());
+ }
+ await Task.Yield();
+ }
+ });
+
+ await Task.Delay(3000);
+
+ sys1.EventStream.Publish(new EndpointTerminatedEvent(false, "localhost:12001", null));
+
+ var port = 12002;
+ for (var i = 12002; i < 12032; i++)
+ {
+ //logger.LogInformation("Touching {i}", i);
+ _ = sys1.Root.RequestAsync(new PID($"localhost:{i}", "$1"), new Touch());
+ }
+
+ while (true)
+ {
+ //logger.LogInformation("End");
+ await Task.Delay(1000);
+ }
+ }
+}
+
+public class NoopActor : IActor
+{
+ public async Task ReceiveAsync(IContext context)
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs
index e564fb422f..9fd4904321 100644
--- a/src/Proto.Remote/Endpoints/EndpointManager.cs
+++ b/src/Proto.Remote/Endpoints/EndpointManager.cs
@@ -53,10 +53,10 @@ public EndpointManager(ActorSystem system, RemoteConfigBase remoteConfig, IChann
public void Start() => SpawnActivator();
- public void Stop()
+ public async Task StopAsync()
{
lock (_synLock)
- {
+ {
if (CancellationToken.IsCancellationRequested)
{
return;
@@ -67,27 +67,28 @@ public void Stop()
_system.EventStream.Unsubscribe(_endpointTerminatedEvnSub);
_cancellationTokenSource.Cancel();
+ }
+
+ // release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint.
+ foreach (var endpoint in _serverEndpoints.Values)
+ {
+ await endpoint.DisposeAsync().ConfigureAwait(false);
+ }
- foreach (var endpoint in _serverEndpoints.Values)
- {
- endpoint.DisposeAsync().GetAwaiter().GetResult();
- }
-
- foreach (var endpoint in _clientEndpoints.Values)
- {
- endpoint.DisposeAsync().GetAwaiter().GetResult();
- }
+ foreach (var endpoint in _clientEndpoints.Values)
+ {
+ await endpoint.DisposeAsync().ConfigureAwait(false);
+ }
- _serverEndpoints.Clear();
- _clientEndpoints.Clear();
+ _serverEndpoints.Clear();
+ _clientEndpoints.Clear();
- StopActivator();
+ StopActivator();
- Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
- }
+ Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
}
- private void OnEndpointTerminated(EndpointTerminatedEvent evt)
+ private async Task OnEndpointTerminated(EndpointTerminatedEvent evt)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
@@ -95,41 +96,43 @@ private void OnEndpointTerminated(EndpointTerminatedEvent evt)
evt.Address ?? evt.ActorSystemId);
}
+ Action? unblock = null;
+ IEndpoint? endpoint = null;
lock (_synLock)
{
- if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out var endpoint))
+ if (_cancellationTokenSource.IsCancellationRequested)
{
- endpoint.DisposeAsync().GetAwaiter().GetResult();
-
- if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
- _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow))
- {
- _ = SafeTask.Run(async () =>
- {
- await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
- .ConfigureAwait(false);
-
- _blockedAddresses.TryRemove(evt.Address, out _);
- });
- }
+ return;
+ }
+
+ if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))
+ {
+ _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
+ unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}
if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
{
- endpoint.DisposeAsync().GetAwaiter().GetResult();
-
- if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
- _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow))
+ _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
+ unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
+ }
+ }
+
+ if (endpoint != null)
+ {
+ // leave the lock to dispose the endpoint, so that requests can't build up behind the lock
+ // the address will always be blocked while we dispose, at a minimum
+ await endpoint.DisposeAsync().ConfigureAwait(false);
+ if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
+ {
+ await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
+ if (_cancellationTokenSource.IsCancellationRequested)
{
- _ = SafeTask.Run(async () =>
- {
- await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
- .ConfigureAwait(false);
-
- _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
- });
+ return;
}
}
+
+ unblock?.Invoke();
}
Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
@@ -157,11 +160,16 @@ internal IEndpoint GetOrAddServerEndpoint(string? address)
lock (_synLock)
{
+ if (_cancellationTokenSource.IsCancellationRequested || _blockedAddresses.ContainsKey(address))
+ {
+ return _blockedEndpoint;
+ }
+
if (_serverEndpoints.TryGetValue(address, out endpoint))
{
return endpoint;
}
-
+
if (_system.Address.StartsWith(ActorSystem.Client, StringComparison.Ordinal))
{
if (Logger.IsEnabled(LogLevel.Debug))
@@ -212,6 +220,11 @@ internal IEndpoint GetOrAddClientEndpoint(string systemId)
lock (_synLock)
{
+ if (_cancellationTokenSource.IsCancellationRequested || _blockedClientSystemIds.ContainsKey(systemId))
+ {
+ return _blockedEndpoint;
+ }
+
if (_clientEndpoints.TryGetValue(systemId, out endpoint))
{
return endpoint;
diff --git a/src/Proto.Remote/Endpoints/EndpointReader.cs b/src/Proto.Remote/Endpoints/EndpointReader.cs
index 94a1b458b6..602d3484b5 100644
--- a/src/Proto.Remote/Endpoints/EndpointReader.cs
+++ b/src/Proto.Remote/Endpoints/EndpointReader.cs
@@ -42,6 +42,8 @@ ServerCallContext context
throw new RpcException(Status.DefaultCancelled, "Suspended");
}
+ var cancellationTokenSource = new CancellationTokenSource();
+
async void Disconnect()
{
try
@@ -60,6 +62,13 @@ async void Disconnect()
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream",
_system.Address);
}
+ finally
+ {
+ // When we disconnect, cancel the token, so the reader and writer both stop, and this method returns,
+ // so that the stream actually closes. Without this, when kestrel begins shutdown, it's possible the
+ // connection will stay open until the kestrel shutdown timeout is reached.
+ cancellationTokenSource.Cancel();
+ }
}
await using (_endpointManager.CancellationToken.Register(Disconnect).ConfigureAwait(false))
@@ -81,8 +90,6 @@ async void Disconnect()
var connectRequest = requestStream.Current.ConnectRequest;
- var cancellationTokenSource = new CancellationTokenSource();
-
switch (connectRequest.ConnectionTypeCase)
{
case ConnectRequest.ConnectionTypeOneofCase.ClientConnection:
@@ -205,7 +212,7 @@ private async Task RunReader(IAsyncStreamReader requestStream, st
{
try
{
- while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
+ while (await requestStream.MoveNext(cancellationTokenSource.Token).ConfigureAwait(false))
{
var currentMessage = requestStream.Current;
diff --git a/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs b/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs
index 847c554c02..b7ad056c44 100644
--- a/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs
+++ b/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs
@@ -42,13 +42,13 @@ public GrpcNetClientRemote(ActorSystem system, GrpcNetRemoteConfig config)
public BlockList BlockList { get; }
public bool Started { get; private set; }
- public Task ShutdownAsync(bool graceful = true)
+ public async Task ShutdownAsync(bool graceful = true)
{
lock (_lock)
{
if (!Started)
{
- return Task.CompletedTask;
+ return;
}
Started = false;
@@ -58,7 +58,7 @@ public Task ShutdownAsync(bool graceful = true)
{
if (graceful)
{
- _endpointManager.Stop();
+ await _endpointManager.StopAsync();
}
_logger.LogInformation(
@@ -73,8 +73,6 @@ public Task ShutdownAsync(bool graceful = true)
System.Id, ex.Message
);
}
-
- return Task.CompletedTask;
}
public Task StartAsync()
diff --git a/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs b/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs
index 1c873517f5..33e7b79aa0 100644
--- a/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs
+++ b/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs
@@ -157,7 +157,7 @@ public async Task ShutdownAsync(bool graceful = true)
{
if (graceful)
{
- _endpointManager.Stop();
+ await _endpointManager.StopAsync();
if (_host is not null)
{
diff --git a/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs b/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs
index fca7e8e3d3..eb6fd22c9e 100644
--- a/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs
+++ b/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs
@@ -66,37 +66,35 @@ public Task StartAsync()
}
}
- public Task ShutdownAsync(bool graceful = true)
+ public async Task ShutdownAsync(bool graceful = true)
{
lock (_lock)
{
if (!Started)
{
- return Task.CompletedTask;
+ return;
}
- try
- {
- _endpointManager.Stop();
-
- _logger.LogInformation(
- "Proto.Actor server stopped on {Address}. Graceful: {Graceful}",
- System.Address, graceful
- );
- }
- catch (Exception ex)
- {
- _logger.LogError(
- ex, "Proto.Actor server stopped on {Address} with error: {MessagePayload}",
- System.Address, ex.Message
- );
+ Started = false;
+ }
- throw;
- }
+ try
+ {
+ await _endpointManager.StopAsync();
- Started = false;
+ _logger.LogInformation(
+ "Proto.Actor server stopped on {Address}. Graceful: {Graceful}",
+ System.Address, graceful
+ );
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(
+ ex, "Proto.Actor server stopped on {Address} with error: {MessagePayload}",
+ System.Address, ex.Message
+ );
- return Task.CompletedTask;
+ throw;
}
}
}
\ No newline at end of file
diff --git a/tests/Proto.Actor.Tests/DisposableActorTests.cs b/tests/Proto.Actor.Tests/DisposableActorTests.cs
index aa3a41b14e..6d16d43c8e 100644
--- a/tests/Proto.Actor.Tests/DisposableActorTests.cs
+++ b/tests/Proto.Actor.Tests/DisposableActorTests.cs
@@ -54,7 +54,7 @@ public async Task WhenActorRestarted_DisposeAsyncIsCalled()
var parent = context.Spawn(props);
context.Send(parent, "crash");
- childMailboxStats.Reset.Wait(1000);
+ childMailboxStats.Reset.Wait(2000);
Assert.True(disposeCalled);
}
diff --git a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs
index 46b4a7415c..2fa4323801 100644
--- a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs
+++ b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs
@@ -79,7 +79,7 @@ public async Task OneForOneStrategy_Should_RestartChildOnFailure()
context.Send(parent, "hello");
- childMailboxStats.Reset.Wait(1000);
+ childMailboxStats.Reset.Wait(2000);
Assert.Contains(childMailboxStats.Posted, msg => msg is Restart);
Assert.Contains(childMailboxStats.Received, msg => msg is Restart);
}
@@ -147,7 +147,7 @@ public async Task
context.Send(parent, "3rd restart");
context.Send(parent, "4th restart");
- childMailboxStats.Reset.Wait(1000);
+ childMailboxStats.Reset.Wait(2000);
Assert.Contains(Stop.Instance, childMailboxStats.Posted);
Assert.Contains(Stop.Instance, childMailboxStats.Received);
}