Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix - Under load and during topology changes, thread saturation can occur, causing a lockup #2139

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08
examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml = examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1481,6 +1483,18 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x64.Build.0 = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.ActiveCfg = Release|Any CPU
{B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.ActiveCfg = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.Build.0 = Debug|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1616,6 +1630,7 @@ Global
{B196FBFE-0DAA-4533-9A56-BB5826A57923} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183}
{B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
23 changes: 23 additions & 0 deletions benchmarks/EndpointManagerTest/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
USER $APP_UID
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["benchmarks/EndpointManagerTest/EndpointManagerTest.csproj", "benchmarks/EndpointManagerTest/"]
COPY ["src/Proto.Actor/Proto.Actor.csproj", "src/Proto.Actor/"]
COPY ["src/Proto.Remote/Proto.Remote.csproj", "src/Proto.Remote/"]
RUN dotnet restore "benchmarks/EndpointManagerTest/EndpointManagerTest.csproj"
COPY . .
WORKDIR "/src/benchmarks/EndpointManagerTest"
RUN dotnet build "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/build

FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "EndpointManagerTest.dll"]
22 changes: 22 additions & 0 deletions benchmarks/EndpointManagerTest/EndpointManagerTest.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Proto.Actor\Proto.Actor.csproj" />
<ProjectReference Include="..\..\src\Proto.Remote\Proto.Remote.csproj" />
</ItemGroup>

<ItemGroup>
<Content Include="..\..\.dockerignore">
<Link>.dockerignore</Link>
</Content>
</ItemGroup>

</Project>
117 changes: 117 additions & 0 deletions benchmarks/EndpointManagerTest/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Remote;
using Proto.Remote.GrpcNet;

namespace EndpointManagerTest;

// This program tests lockup issues with the EndpointManager.
// TL:DR; This is to demonstrate the issue with the locking and blocking waits in EndpointManager, and to confirm the fix.
//
// This recreates a scenario we were seeing in our production environments.
// What we saw was 30 cluster clients were sending many messages to 2 of the cluster members, who were sending messages to eachother depending
// on actor placement. If something happens and the 2 members had to reboot, they would end up locking up, not being able to do anything.
// This scenario has been recreated more simply here, where you have 2 members sending many messages back and forth, a disconnect comes through
// from a member that recently restarted, and new connections are being opened to other members. Putting all these together, we end up in a situation
// where many threads get stuck at a lock in EndpointManager, while the one thread inside of the lock is waiting for a ServerConnector to stop.
// NOTE: that this can be a bit flakey as we are trying to reproduce a complete thread lockup. So there is a dockerfile to run it in a more consistent
// environment. Using `--cpus="1"` with docker will make it even more consistent, but sometimes it takes a few tries to repro.
// You will know you reproduced it when you stop seeing "This should log every second." every second. you may also see the built in
// "ThreadPool is running hot" log, but the absence of that log is ambiguous, since if it's locked up it won't finish to log how long it took!
// The other indicator is that all the new connections made at the end should be logging terminations and reconnects and quickly give up (since they don't exist),
// but of course that won't be happening when you're locked up. Also seeing any "terminating" messages without a corresponding "terminated" message
// also indicates that you're locked up.
class Program
{
private static async Task Main()
{
Log.SetLoggerFactory(
LoggerFactory.Create(
c =>
c.SetMinimumLevel(LogLevel.Debug)
.AddFilter("Microsoft", LogLevel.None)
.AddFilter("Grpc", LogLevel.None)
.AddFilter("Proto.Context.ActorContext", LogLevel.Information)
.AddFilter("Proto.Diagnostics.DiagnosticsStore", LogLevel.Warning)
.AddFilter("Proto.Remote.ServerConnector", LogLevel.Error)
.AddSimpleConsole(o => o.SingleLine = true)
)
);

var logger = Log.CreateLogger("Main");

_ = Task.Factory.StartNew(async () =>
{
while (true)
{
try
{
await Task.Factory.StartNew(async () => { await Task.Yield(); });
}
catch (Exception)
{
}

logger.LogInformation("This should log every second [pending: {pendingWorkItems}].", ThreadPool.PendingWorkItemCount);
await Task.Delay(1000);
}
});

var sys1 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12000).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
await sys1.Remote().StartAsync();

var sys2 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12001).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor())));
await sys2.Remote().StartAsync();

var echoActorOn2 = (await sys1.Remote().SpawnAsync("localhost:12001", "noop", TimeSpan.FromSeconds(1))).Pid;
_ = Task.Factory.StartNew(async () =>
{
while (true)
{
for (var i = 0; i < 200; i++)
{
_ = sys1.Root.RequestAsync<Touched>(echoActorOn2, new Touch());
}
await Task.Yield();
}
});

var echoActorOn1 = (await sys2.Remote().SpawnAsync("localhost:12000", "noop", TimeSpan.FromSeconds(1))).Pid;
_ = Task.Factory.StartNew(async () =>
{
while (true)
{
for (var i = 0; i < 200; i++)
{
_ = sys2.Root.RequestAsync<Touched>(echoActorOn1, new Touch());
}
await Task.Yield();
}
});

await Task.Delay(3000);

sys1.EventStream.Publish(new EndpointTerminatedEvent(false, "localhost:12001", null));

var port = 12002;
for (var i = 12002; i < 12032; i++)
{
//logger.LogInformation("Touching {i}", i);
_ = sys1.Root.RequestAsync<Touched>(new PID($"localhost:{i}", "$1"), new Touch());
}

while (true)
{
//logger.LogInformation("End");
await Task.Delay(1000);
}
}
}

public class NoopActor : IActor
{
public async Task ReceiveAsync(IContext context)
{
}
}
99 changes: 56 additions & 43 deletions src/Proto.Remote/Endpoints/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public EndpointManager(ActorSystem system, RemoteConfigBase remoteConfig, IChann

public void Start() => SpawnActivator();

public void Stop()
public async Task StopAsync()
{
lock (_synLock)
{
{
if (CancellationToken.IsCancellationRequested)
{
return;
Expand All @@ -67,69 +67,72 @@ public void Stop()
_system.EventStream.Unsubscribe(_endpointTerminatedEvnSub);

_cancellationTokenSource.Cancel();
}

// release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint.
foreach (var endpoint in _serverEndpoints.Values)
{
await endpoint.DisposeAsync().ConfigureAwait(false);
}

foreach (var endpoint in _serverEndpoints.Values)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();
}

foreach (var endpoint in _clientEndpoints.Values)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();
}
foreach (var endpoint in _clientEndpoints.Values)
{
await endpoint.DisposeAsync().ConfigureAwait(false);
}

_serverEndpoints.Clear();
_clientEndpoints.Clear();
_serverEndpoints.Clear();
_clientEndpoints.Clear();

StopActivator();
StopActivator();

Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
}
Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address);
}

private void OnEndpointTerminated(EndpointTerminatedEvent evt)
private async Task OnEndpointTerminated(EndpointTerminatedEvent evt)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminating", _system.Address,
evt.Address ?? evt.ActorSystemId);
}

Action? unblock = null;
IEndpoint? endpoint = null;
lock (_synLock)
{
if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out var endpoint))
if (_cancellationTokenSource.IsCancellationRequested)
{
endpoint.DisposeAsync().GetAwaiter().GetResult();

if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow))
{
_ = SafeTask.Run(async () =>
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
.ConfigureAwait(false);

_blockedAddresses.TryRemove(evt.Address, out _);
});
}
return;
}

if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}

if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
{
endpoint.DisposeAsync().GetAwaiter().GetResult();

if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue &&
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow))
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
}
}

if (endpoint != null)
{
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
if (_cancellationTokenSource.IsCancellationRequested)
{
_ = SafeTask.Run(async () =>
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value)
.ConfigureAwait(false);

_blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
});
return;
}
}

unblock?.Invoke();
}

Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
Expand Down Expand Up @@ -157,11 +160,16 @@ internal IEndpoint GetOrAddServerEndpoint(string? address)

lock (_synLock)
{
if (_cancellationTokenSource.IsCancellationRequested || _blockedAddresses.ContainsKey(address))
{
return _blockedEndpoint;
}

if (_serverEndpoints.TryGetValue(address, out endpoint))
{
return endpoint;
}

if (_system.Address.StartsWith(ActorSystem.Client, StringComparison.Ordinal))
{
if (Logger.IsEnabled(LogLevel.Debug))
Expand Down Expand Up @@ -212,6 +220,11 @@ internal IEndpoint GetOrAddClientEndpoint(string systemId)

lock (_synLock)
{
if (_cancellationTokenSource.IsCancellationRequested || _blockedClientSystemIds.ContainsKey(systemId))
{
return _blockedEndpoint;
}

if (_clientEndpoints.TryGetValue(systemId, out endpoint))
{
return endpoint;
Expand Down
13 changes: 10 additions & 3 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ ServerCallContext context
throw new RpcException(Status.DefaultCancelled, "Suspended");
}

var cancellationTokenSource = new CancellationTokenSource();

async void Disconnect()
{
try
Expand All @@ -60,6 +62,13 @@ async void Disconnect()
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream",
_system.Address);
}
finally
{
// When we disconnect, cancel the token, so the reader and writer both stop, and this method returns,
// so that the stream actually closes. Without this, when kestrel begins shutdown, it's possible the
// connection will stay open until the kestrel shutdown timeout is reached.
cancellationTokenSource.Cancel();
}
}

await using (_endpointManager.CancellationToken.Register(Disconnect).ConfigureAwait(false))
Expand All @@ -81,8 +90,6 @@ async void Disconnect()

var connectRequest = requestStream.Current.ConnectRequest;

var cancellationTokenSource = new CancellationTokenSource();

switch (connectRequest.ConnectionTypeCase)
{
case ConnectRequest.ConnectionTypeOneofCase.ClientConnection:
Expand Down Expand Up @@ -205,7 +212,7 @@ private async Task RunReader(IAsyncStreamReader<RemoteMessage> requestStream, st
{
try
{
while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
while (await requestStream.MoveNext(cancellationTokenSource.Token).ConfigureAwait(false))
{
var currentMessage = requestStream.Current;

Expand Down
Loading
Loading