Skip to content

Commit

Permalink
Fix - Under load and during topology changes, thread saturation can o…
Browse files Browse the repository at this point in the history
…ccur, causing a lockup (#2139)

* Add endpoint manager test to repro thread lockup

fix merge

* add explanation and dockerfile

* Block endpoint while it disposes instead of holding requests behind a lock. this also allows messages to other endpoints while one is disposing, as well as multiple endpoint disposes at the same time.

* Change EndpointManager Stop to StopAsync

* ensure the EndpointReader always finishes up the request after sending the DisconnectRequest, so it doesn't time out during kestrel shutdown

* increase timeout on a couple tests so they fail less often

* increase another timeout on flakey test
  • Loading branch information
benbenwilde authored Nov 5, 2024
1 parent 4bbf51b commit eaac059
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 76 deletions.
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08
examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml = examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1481,6 +1483,18 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x64.Build.0 = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.ActiveCfg = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1616,6 +1630,7 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183}
{B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
23 changes: 23 additions & 0 deletions benchmarks/EndpointManagerTest/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
USER $APP_UID
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["benchmarks/EndpointManagerTest/EndpointManagerTest.csproj", "benchmarks/EndpointManagerTest/"]
COPY ["src/Proto.Actor/Proto.Actor.csproj", "src/Proto.Actor/"]
COPY ["src/Proto.Remote/Proto.Remote.csproj", "src/Proto.Remote/"]
RUN dotnet restore "benchmarks/EndpointManagerTest/EndpointManagerTest.csproj"
COPY . .
WORKDIR "/src/benchmarks/EndpointManagerTest"
RUN dotnet build "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/build

FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "EndpointManagerTest.dll"]
22 changes: 22 additions & 0 deletions benchmarks/EndpointManagerTest/EndpointManagerTest.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Proto.Actor\Proto.Actor.csproj" />
<ProjectReference Include="..\..\src\Proto.Remote\Proto.Remote.csproj" />
</ItemGroup>

<ItemGroup>
<Content Include="..\..\.dockerignore">
<Link>.dockerignore</Link>
</Content>
</ItemGroup>

</Project>
117 changes: 117 additions & 0 deletions benchmarks/EndpointManagerTest/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Remote;
using Proto.Remote.GrpcNet;

namespace EndpointManagerTest;

// This program tests lockup issues with the EndpointManager.
// TL:DR; This is to demonstrate the issue with the locking and blocking waits in EndpointManager, and to confirm the fix.
//
// This recreates a scenario we were seeing in our production environments.
// What we saw was 30 cluster clients were sending many messages to 2 of the cluster members, who were sending messages to eachother depending
// on actor placement. If something happens and the 2 members had to reboot, they would end up locking up, not being able to do anything.
// This scenario has been recreated more simply here, where you have 2 members sending many messages back and forth, a disconnect comes through
// from a member that recently restarted, and new connections are being opened to other members. Putting all these together, we end up in a situation
// where many threads get stuck at a lock in EndpointManager, while the one thread inside of the lock is waiting for a ServerConnector to stop.
// NOTE: that this can be a bit flakey as we are trying to reproduce a complete thread lockup. So there is a dockerfile to run it in a more consistent
// environment. Using `--cpus="1"` with docker will make it even more consistent, but sometimes it takes a few tries to repro.
// You will know you reproduced it when you stop seeing "This should log every second." every second. you may also see the built in
// "ThreadPool is running hot" log, but the absence of that log is ambiguous, since if it's locked up it won't finish to log how long it took!
// The other indicator is that all the new connections made at the end should be logging terminations and reconnects and quickly give up (since they don't exist),
// but of course that won't be happening when you're locked up. Also seeing any "terminating" messages without a corresponding "terminated" message
// also indicates that you're locked up.
class Program
{
private static async Task Main()
{
Log.SetLoggerFactory(
LoggerFactory.Create(
c =>
c.SetMinimumLevel(LogLevel.Debug)
.AddFilter("Microsoft", LogLevel.None)
.AddFilter("Grpc", LogLevel.None)
.AddFilter("Proto.Context.ActorContext", LogLevel.Information)
.AddFilter("Proto.Diagnostics.DiagnosticsStore", LogLevel.Warning)
.AddFilter("Proto.Remote.ServerConnector", LogLevel.Error)
.AddSimpleConsole(o => o.SingleLine = true)
)
);

var logger = Log.CreateLogger("Main");

_ = Task.Factory.StartNew(async () =>
{
while (true)
{
try
{
await Task.Factory.StartNew(async () => { await Task.Yield(); });
}
catch (Exception)
{
}

logger.LogInformation("This should log every second [pending: {pendingWorkItems}].", ThreadPool.PendingWorkItemCount);
await Task.Delay(1000);
}
});

var sys1 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12000).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
await sys1.Remote().StartAsync();

var sys2 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12001).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
await sys2.Remote().StartAsync();

var echoActorOn2 = (await sys1.Remote().SpawnAsync("localhost:12001", "noop", TimeSpan.FromSeconds(1))).Pid;
_ = Task.Factory.StartNew(async () =>
{
while (true)
{
for (var i = 0; i < 200; i++)
{
_ = sys1.Root.RequestAsync<Touched>(echoActorOn2, new Touch());
}
await Task.Yield();
}
});

var echoActorOn1 = (await sys2.Remote().SpawnAsync("localhost:12000", "noop", TimeSpan.FromSeconds(1))).Pid;
_ = Task.Factory.StartNew(async () =>
{
while (true)
{
for (var i = 0; i < 200; i++)
{
_ = sys2.Root.RequestAsync<Touched>(echoActorOn1, new Touch());
}
await Task.Yield();
}
});

await Task.Delay(3000);

sys1.EventStream.Publish(new EndpointTerminatedEvent(false, "localhost:12001", null));

var port = 12002;
for (var i = 12002; i < 12032; i++)
{
//logger.LogInformation("Touching {i}", i);
_ = sys1.Root.RequestAsync<Touched>(new PID($"localhost:{i}", "$1"), new Touch());
}

while (true)
{
//logger.LogInformation("End");
await Task.Delay(1000);
}
}
}

public class NoopActor : IActor
{
public async Task ReceiveAsync(IContext context)
{
}
}
99 changes: 56 additions & 43 deletions src/Proto.Remote/Endpoints/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public EndpointManager(ActorSystem system, RemoteConfigBase remoteConfig, IChann

public void Start() => SpawnActivator();

public void Stop()
public async Task StopAsync()
{
lock (_synLock)
{
{
if (CancellationToken.IsCancellationRequested)
{
return;
Expand All @@ -67,69 +67,72 @@ public void Stop()
_system.EventStream.Unsubscribe(_endpointTerminatedEvnSub);

_cancellationTokenSource.Cancel();
}

// release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint.
foreach (var endpoint in _serverEndpoints.Values)
{
await endpoint.DisposeAsync().ConfigureAwait(false);
}

foreach (var endpoint in _serverEndpoints.Values)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();
}

foreach (var endpoint in _clientEndpoints.Values)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();
}
foreach (var endpoint in _clientEndpoints.Values)
{
await endpoint.DisposeAsync().ConfigureAwait(false);
}

_serverEndpoints.Clear();
_clientEndpoints.Clear();
_serverEndpoints.Clear();
_clientEndpoints.Clear();

StopActivator();
StopActivator();

Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
}
Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
}

private void OnEndpointTerminated(EndpointTerminatedEvent evt)
private async Task OnEndpointTerminated(EndpointTerminatedEvent evt)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminating", _system.Address,
evt.Address ?? evt.ActorSystemId);
}

Action? unblock = null;
IEndpoint? endpoint = null;
lock (_synLock)
{
if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out var endpoint))
if (_cancellationTokenSource.IsCancellationRequested)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();

if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow))
{
_ = SafeTask.Run(async () =>
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
.ConfigureAwait(false);

_blockedAddresses.TryRemove(evt.Address, out _);
});
}
return;
}

if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}

if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
{
endpoint.DisposeAsync().GetAwaiter().GetResult();

if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow))
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
}
}

if (endpoint != null)
{
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
if (_cancellationTokenSource.IsCancellationRequested)
{
_ = SafeTask.Run(async () =>
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
.ConfigureAwait(false);

_blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
});
return;
}
}

unblock?.Invoke();
}

Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
Expand Down Expand Up @@ -157,11 +160,16 @@ internal IEndpoint GetOrAddServerEndpoint(string? address)

lock (_synLock)
{
if (_cancellationTokenSource.IsCancellationRequested || _blockedAddresses.ContainsKey(address))
{
return _blockedEndpoint;
}

if (_serverEndpoints.TryGetValue(address, out endpoint))
{
return endpoint;
}

if (_system.Address.StartsWith(ActorSystem.Client, StringComparison.Ordinal))
{
if (Logger.IsEnabled(LogLevel.Debug))
Expand Down Expand Up @@ -212,6 +220,11 @@ internal IEndpoint GetOrAddClientEndpoint(string systemId)

lock (_synLock)
{
if (_cancellationTokenSource.IsCancellationRequested || _blockedClientSystemIds.ContainsKey(systemId))
{
return _blockedEndpoint;
}

if (_clientEndpoints.TryGetValue(systemId, out endpoint))
{
return endpoint;
Expand Down
13 changes: 10 additions & 3 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ ServerCallContext context
throw new RpcException(Status.DefaultCancelled, "Suspended");
}

var cancellationTokenSource = new CancellationTokenSource();

async void Disconnect()
{
try
Expand All @@ -60,6 +62,13 @@ async void Disconnect()
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream",
_system.Address);
}
finally
{
// When we disconnect, cancel the token, so the reader and writer both stop, and this method returns,
// so that the stream actually closes. Without this, when kestrel begins shutdown, it's possible the
// connection will stay open until the kestrel shutdown timeout is reached.
cancellationTokenSource.Cancel();
}
}

await using (_endpointManager.CancellationToken.Register(Disconnect).ConfigureAwait(false))
Expand All @@ -81,8 +90,6 @@ async void Disconnect()

var connectRequest = requestStream.Current.ConnectRequest;

var cancellationTokenSource = new CancellationTokenSource();

switch (connectRequest.ConnectionTypeCase)
{
case ConnectRequest.ConnectionTypeOneofCase.ClientConnection:
Expand Down Expand Up @@ -205,7 +212,7 @@ private async Task RunReader(IAsyncStreamReader<RemoteMessage> requestStream, st
{
try
{
while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
while (await requestStream.MoveNext(cancellationTokenSource.Token).ConfigureAwait(false))
{
var currentMessage = requestStream.Current;

Expand Down
Loading

0 comments on commit eaac059

Please sign in to comment.