From 3b75ecbeb96897df45066d0f7a2cb3b9227d60fe Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Fri, 1 Nov 2024 21:17:45 -0500 Subject: [PATCH 1/7] Add endpoint manager test to repro thread lockup fix merge --- ProtoActor.sln | 15 ++++ .../EndpointManagerTest.csproj | 15 ++++ benchmarks/EndpointManagerTest/Program.cs | 74 +++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 benchmarks/EndpointManagerTest/EndpointManagerTest.csproj create mode 100644 benchmarks/EndpointManagerTest/Program.cs diff --git a/ProtoActor.sln b/ProtoActor.sln index 72445911ca..cf661607a2 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -307,6 +307,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08 examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml = examples\ClusterK8sGrains\chart\templates\protoactor-k8s-grains-serviceaccount.yaml EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1481,6 +1483,18 @@ Global {B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x64.Build.0 = Release|Any CPU {B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.ActiveCfg = Release|Any CPU {B196FBFE-0DAA-4533-9A56-BB5826A57923}.Release|x86.Build.0 = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.ActiveCfg = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x64.Build.0 = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.ActiveCfg = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Debug|x86.Build.0 = Debug|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|Any CPU.Build.0 = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.ActiveCfg = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU + {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1616,6 +1630,7 @@ Global {B196FBFE-0DAA-4533-9A56-BB5826A57923} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C} {CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C} {087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183} + {B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C} diff --git a/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj new file mode 100644 index 0000000000..eb377655de --- /dev/null +++ b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + diff --git a/benchmarks/EndpointManagerTest/Program.cs b/benchmarks/EndpointManagerTest/Program.cs new file mode 100644 index 0000000000..270725fdb9 --- /dev/null +++ b/benchmarks/EndpointManagerTest/Program.cs @@ -0,0 +1,74 @@ +using Microsoft.Extensions.Logging; +using Proto; +using Proto.Remote; +using Proto.Remote.GrpcNet; + +namespace EndpointManagerTest; + +class Program +{ + private static async Task Main() + { + Log.SetLoggerFactory( + LoggerFactory.Create( + c => + c.SetMinimumLevel(LogLevel.Debug) + .AddFilter("Microsoft", LogLevel.None) + .AddFilter("Grpc", LogLevel.None) + .AddFilter("Proto.Context.ActorContext", LogLevel.Information) + .AddFilter("Proto.Remote.ServerConnector", LogLevel.Error) + .AddSimpleConsole(o => o.SingleLine = true) + ) + ); + + var logger = Log.CreateLogger("Main"); + + var sys1 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12000).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor()))); + await sys1.Remote().StartAsync(); + + var sys2 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12001).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor()))); + await sys2.Remote().StartAsync(); + + var echoActorOn2 = (await sys1.Remote().SpawnAsync("localhost:12001", "noop", TimeSpan.FromSeconds(1))).Pid; + _ = Task.Factory.StartNew(async () => + { + while (true) + { + _ = sys1.Root.RequestAsync(echoActorOn2, new Touch()); + } + }); + + var echoActorOn1 = (await sys2.Remote().SpawnAsync("localhost:12000", "noop", TimeSpan.FromSeconds(1))).Pid; + _ = Task.Factory.StartNew(async () => + { + while (true) + { + _ = sys2.Root.RequestAsync(echoActorOn1, new Touch()); + } + }); + + await Task.Delay(3000); + + sys1.EventStream.Publish(new EndpointTerminatedEvent(false, "localhost:12001", null)); + + var port = 12002; + for (var i = 12002; i < 12012; i++) + { + //logger.LogInformation("Touching {i}", i); + _ = sys1.Root.RequestAsync(new PID($"localhost:{i}", "$1"), new Touch()); + } + + while (true) + { + //logger.LogInformation("End"); + await Task.Delay(1000); + } + } +} + +public class NoopActor : IActor +{ + public async Task ReceiveAsync(IContext context) + { + } +} \ No newline at end of file From df4591b7c888f6413926c8f4b1f287e0aaa75b39 Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Fri, 1 Nov 2024 22:30:23 -0500 Subject: [PATCH 2/7] add explanation and dockerfile --- benchmarks/EndpointManagerTest/Dockerfile | 23 +++++++++ .../EndpointManagerTest.csproj | 7 +++ benchmarks/EndpointManagerTest/Program.cs | 51 +++++++++++++++++-- 3 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 benchmarks/EndpointManagerTest/Dockerfile diff --git a/benchmarks/EndpointManagerTest/Dockerfile b/benchmarks/EndpointManagerTest/Dockerfile new file mode 100644 index 0000000000..ed6318e6e1 --- /dev/null +++ b/benchmarks/EndpointManagerTest/Dockerfile @@ -0,0 +1,23 @@ +FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base +USER $APP_UID +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["benchmarks/EndpointManagerTest/EndpointManagerTest.csproj", "benchmarks/EndpointManagerTest/"] +COPY ["src/Proto.Actor/Proto.Actor.csproj", "src/Proto.Actor/"] +COPY ["src/Proto.Remote/Proto.Remote.csproj", "src/Proto.Remote/"] +RUN dotnet restore "benchmarks/EndpointManagerTest/EndpointManagerTest.csproj" +COPY . . +WORKDIR "/src/benchmarks/EndpointManagerTest" +RUN dotnet build "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/build + +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "EndpointManagerTest.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "EndpointManagerTest.dll"] diff --git a/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj index eb377655de..27ae7a51bf 100644 --- a/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj +++ b/benchmarks/EndpointManagerTest/EndpointManagerTest.csproj @@ -5,11 +5,18 @@ net8.0 enable enable + Linux + + + + .dockerignore + + diff --git a/benchmarks/EndpointManagerTest/Program.cs b/benchmarks/EndpointManagerTest/Program.cs index 270725fdb9..710c4a95b2 100644 --- a/benchmarks/EndpointManagerTest/Program.cs +++ b/benchmarks/EndpointManagerTest/Program.cs @@ -1,10 +1,27 @@ -using Microsoft.Extensions.Logging; +using System.Diagnostics; +using Microsoft.Extensions.Logging; using Proto; using Proto.Remote; using Proto.Remote.GrpcNet; namespace EndpointManagerTest; +// This program tests lockup issues with the EndpointManager. +// TL:DR; This is to demonstrate the issue with the locking and blocking waits in EndpointManager, and to confirm the fix. +// +// This recreates a scenario we were seeing in our production environments. +// What we saw was 30 cluster clients were sending many messages to 2 of the cluster members, who were sending messages to eachother depending +// on actor placement. If something happens and the 2 members had to reboot, they would end up locking up, not being able to do anything. +// This scenario has been recreated more simply here, where you have 2 members sending many messages back and forth, a disconnect comes through +// from a member that recently restarted, and new connections are being opened to other members. Putting all these together, we end up in a situation +// where many threads get stuck at a lock in EndpointManager, while the one thread inside of the lock is waiting for a ServerConnector to stop. +// NOTE: that this can be a bit flakey as we are trying to reproduce a complete thread lockup. So there is a dockerfile to run it in a more consistent +// environment. Using `--cpus="1"` with docker will make it even more consistent, but sometimes it takes a few tries to repro. +// You will know you reproduced it when you stop seeing "This should log every second." every second. you may also see the built in +// "ThreadPool is running hot" log, but the absence of that log is ambiguous, since if it's locked up it won't finish to log how long it took! +// The other indicator is that all the new connections made at the end should be logging terminations and reconnects and quickly give up (since they don't exist), +// but of course that won't be happening when you're locked up. Also seeing any "terminating" messages without a corresponding "terminated" message +// also indicates that you're locked up. class Program { private static async Task Main() @@ -16,6 +33,7 @@ private static async Task Main() .AddFilter("Microsoft", LogLevel.None) .AddFilter("Grpc", LogLevel.None) .AddFilter("Proto.Context.ActorContext", LogLevel.Information) + .AddFilter("Proto.Diagnostics.DiagnosticsStore", LogLevel.Warning) .AddFilter("Proto.Remote.ServerConnector", LogLevel.Error) .AddSimpleConsole(o => o.SingleLine = true) ) @@ -23,6 +41,23 @@ private static async Task Main() var logger = Log.CreateLogger("Main"); + _ = Task.Factory.StartNew(async () => + { + while (true) + { + try + { + await Task.Factory.StartNew(async () => { await Task.Yield(); }); + } + catch (Exception) + { + } + + logger.LogInformation("This should log every second [pending: {pendingWorkItems}].", ThreadPool.PendingWorkItemCount); + await Task.Delay(1000); + } + }); + var sys1 = new ActorSystem().WithRemote(GrpcNetRemoteConfig.BindTo("localhost", 12000).WithRemoteKind("noop", Props.FromProducer(() => new NoopActor()))); await sys1.Remote().StartAsync(); @@ -34,7 +69,11 @@ private static async Task Main() { while (true) { - _ = sys1.Root.RequestAsync(echoActorOn2, new Touch()); + for (var i = 0; i < 200; i++) + { + _ = sys1.Root.RequestAsync(echoActorOn2, new Touch()); + } + await Task.Yield(); } }); @@ -43,7 +82,11 @@ private static async Task Main() { while (true) { - _ = sys2.Root.RequestAsync(echoActorOn1, new Touch()); + for (var i = 0; i < 200; i++) + { + _ = sys2.Root.RequestAsync(echoActorOn1, new Touch()); + } + await Task.Yield(); } }); @@ -52,7 +95,7 @@ private static async Task Main() sys1.EventStream.Publish(new EndpointTerminatedEvent(false, "localhost:12001", null)); var port = 12002; - for (var i = 12002; i < 12012; i++) + for (var i = 12002; i < 12032; i++) { //logger.LogInformation("Touching {i}", i); _ = sys1.Root.RequestAsync(new PID($"localhost:{i}", "$1"), new Touch()); From f321c1e269dbf4c2f62f6f34bed228c58c8a1289 Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Mon, 4 Nov 2024 16:59:17 -0600 Subject: [PATCH 3/7] Block endpoint while it disposes instead of holding requests behind a lock. this also allows messages to other endpoints while one is disposing, as well as multiple endpoint disposes at the same time. --- src/Proto.Remote/Endpoints/EndpointManager.cs | 97 +++++++++++-------- 1 file changed, 55 insertions(+), 42 deletions(-) diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs index e564fb422f..5765edd7fd 100644 --- a/src/Proto.Remote/Endpoints/EndpointManager.cs +++ b/src/Proto.Remote/Endpoints/EndpointManager.cs @@ -56,7 +56,7 @@ public EndpointManager(ActorSystem system, RemoteConfigBase remoteConfig, IChann public void Stop() { lock (_synLock) - { + { if (CancellationToken.IsCancellationRequested) { return; @@ -67,27 +67,28 @@ public void Stop() _system.EventStream.Unsubscribe(_endpointTerminatedEvnSub); _cancellationTokenSource.Cancel(); + } + + // release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint. + foreach (var endpoint in _serverEndpoints.Values) + { + endpoint.DisposeAsync().GetAwaiter().GetResult(); + } - foreach (var endpoint in _serverEndpoints.Values) - { - endpoint.DisposeAsync().GetAwaiter().GetResult(); - } - - foreach (var endpoint in _clientEndpoints.Values) - { - endpoint.DisposeAsync().GetAwaiter().GetResult(); - } + foreach (var endpoint in _clientEndpoints.Values) + { + endpoint.DisposeAsync().GetAwaiter().GetResult(); + } - _serverEndpoints.Clear(); - _clientEndpoints.Clear(); + _serverEndpoints.Clear(); + _clientEndpoints.Clear(); - StopActivator(); + StopActivator(); - Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address); - } + Logger.LogDebug("[{SystemAddress}] Stopped", _system.Address); } - private void OnEndpointTerminated(EndpointTerminatedEvent evt) + private async Task OnEndpointTerminated(EndpointTerminatedEvent evt) { if (Logger.IsEnabled(LogLevel.Debug)) { @@ -95,41 +96,43 @@ private void OnEndpointTerminated(EndpointTerminatedEvent evt) evt.Address ?? evt.ActorSystemId); } + Action? unblock = null; + IEndpoint? endpoint = null; lock (_synLock) { - if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out var endpoint)) + if (_cancellationTokenSource.IsCancellationRequested) { - endpoint.DisposeAsync().GetAwaiter().GetResult(); - - if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue && - _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow)) - { - _ = SafeTask.Run(async () => - { - await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value) - .ConfigureAwait(false); - - _blockedAddresses.TryRemove(evt.Address, out _); - }); - } + return; + } + + if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint)) + { + _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow); + unblock = () => _blockedAddresses.TryRemove(evt.Address, out _); } if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint)) { - endpoint.DisposeAsync().GetAwaiter().GetResult(); - - if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue && - _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow)) + _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow); + unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _); + } + } + + if (endpoint != null) + { + // leave the lock to dispose the endpoint, so that requests can't build up behind the lock + // the address will always be blocked while we dispose, at a minimum + await endpoint.DisposeAsync().ConfigureAwait(false); + if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue) + { + await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false); + if (_cancellationTokenSource.IsCancellationRequested) { - _ = SafeTask.Run(async () => - { - await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value) - .ConfigureAwait(false); - - _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _); - }); + return; } } + + unblock?.Invoke(); } Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address, @@ -157,11 +160,16 @@ internal IEndpoint GetOrAddServerEndpoint(string? address) lock (_synLock) { + if (_cancellationTokenSource.IsCancellationRequested || _blockedAddresses.ContainsKey(address)) + { + return _blockedEndpoint; + } + if (_serverEndpoints.TryGetValue(address, out endpoint)) { return endpoint; } - + if (_system.Address.StartsWith(ActorSystem.Client, StringComparison.Ordinal)) { if (Logger.IsEnabled(LogLevel.Debug)) @@ -212,6 +220,11 @@ internal IEndpoint GetOrAddClientEndpoint(string systemId) lock (_synLock) { + if (_cancellationTokenSource.IsCancellationRequested || _blockedClientSystemIds.ContainsKey(systemId)) + { + return _blockedEndpoint; + } + if (_clientEndpoints.TryGetValue(systemId, out endpoint)) { return endpoint; From d1e2ce51038bb1d05c68111e8e5be9fa5e5ecaeb Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Mon, 4 Nov 2024 23:03:16 -0600 Subject: [PATCH 4/7] Change EndpointManager Stop to StopAsync --- src/Proto.Remote/Endpoints/EndpointManager.cs | 6 +-- .../GrpcNet/GrpcNetClientRemote.cs | 8 ++-- src/Proto.Remote/GrpcNet/GrpcNetRemote.cs | 2 +- .../GrpcNet/HostedGrpcNetRemote.cs | 40 +++++++++---------- 4 files changed, 26 insertions(+), 30 deletions(-) diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs index 5765edd7fd..8bee4c0728 100644 --- a/src/Proto.Remote/Endpoints/EndpointManager.cs +++ b/src/Proto.Remote/Endpoints/EndpointManager.cs @@ -53,7 +53,7 @@ public EndpointManager(ActorSystem system, RemoteConfigBase remoteConfig, IChann public void Start() => SpawnActivator(); - public void Stop() + public async Task StopAsync() { lock (_synLock) { @@ -72,12 +72,12 @@ public void Stop() // release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint. foreach (var endpoint in _serverEndpoints.Values) { - endpoint.DisposeAsync().GetAwaiter().GetResult(); + await endpoint.DisposeAsync(); } foreach (var endpoint in _clientEndpoints.Values) { - endpoint.DisposeAsync().GetAwaiter().GetResult(); + await endpoint.DisposeAsync(); } _serverEndpoints.Clear(); diff --git a/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs b/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs index 847c554c02..b7ad056c44 100644 --- a/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs +++ b/src/Proto.Remote/GrpcNet/GrpcNetClientRemote.cs @@ -42,13 +42,13 @@ public GrpcNetClientRemote(ActorSystem system, GrpcNetRemoteConfig config) public BlockList BlockList { get; } public bool Started { get; private set; } - public Task ShutdownAsync(bool graceful = true) + public async Task ShutdownAsync(bool graceful = true) { lock (_lock) { if (!Started) { - return Task.CompletedTask; + return; } Started = false; @@ -58,7 +58,7 @@ public Task ShutdownAsync(bool graceful = true) { if (graceful) { - _endpointManager.Stop(); + await _endpointManager.StopAsync(); } _logger.LogInformation( @@ -73,8 +73,6 @@ public Task ShutdownAsync(bool graceful = true) System.Id, ex.Message ); } - - return Task.CompletedTask; } public Task StartAsync() diff --git a/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs b/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs index 1c873517f5..33e7b79aa0 100644 --- a/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs +++ b/src/Proto.Remote/GrpcNet/GrpcNetRemote.cs @@ -157,7 +157,7 @@ public async Task ShutdownAsync(bool graceful = true) { if (graceful) { - _endpointManager.Stop(); + await _endpointManager.StopAsync(); if (_host is not null) { diff --git a/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs b/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs index fca7e8e3d3..eb6fd22c9e 100644 --- a/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs +++ b/src/Proto.Remote/GrpcNet/HostedGrpcNetRemote.cs @@ -66,37 +66,35 @@ public Task StartAsync() } } - public Task ShutdownAsync(bool graceful = true) + public async Task ShutdownAsync(bool graceful = true) { lock (_lock) { if (!Started) { - return Task.CompletedTask; + return; } - try - { - _endpointManager.Stop(); - - _logger.LogInformation( - "Proto.Actor server stopped on {Address}. Graceful: {Graceful}", - System.Address, graceful - ); - } - catch (Exception ex) - { - _logger.LogError( - ex, "Proto.Actor server stopped on {Address} with error: {MessagePayload}", - System.Address, ex.Message - ); + Started = false; + } - throw; - } + try + { + await _endpointManager.StopAsync(); - Started = false; + _logger.LogInformation( + "Proto.Actor server stopped on {Address}. Graceful: {Graceful}", + System.Address, graceful + ); + } + catch (Exception ex) + { + _logger.LogError( + ex, "Proto.Actor server stopped on {Address} with error: {MessagePayload}", + System.Address, ex.Message + ); - return Task.CompletedTask; + throw; } } } \ No newline at end of file From 9c8e464e96f3ae9e5423543878d444cca123656a Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Tue, 5 Nov 2024 00:47:44 -0600 Subject: [PATCH 5/7] ensure the EndpointReader always finishes up the request after sending the DisconnectRequest, so it doesn't time out during kestrel shutdown --- src/Proto.Remote/Endpoints/EndpointManager.cs | 4 ++-- src/Proto.Remote/Endpoints/EndpointReader.cs | 13 ++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs index 8bee4c0728..9fd4904321 100644 --- a/src/Proto.Remote/Endpoints/EndpointManager.cs +++ b/src/Proto.Remote/Endpoints/EndpointManager.cs @@ -72,12 +72,12 @@ public async Task StopAsync() // release the lock while we dispose, other threads will see the cancellation token and return blocked endpoint. foreach (var endpoint in _serverEndpoints.Values) { - await endpoint.DisposeAsync(); + await endpoint.DisposeAsync().ConfigureAwait(false); } foreach (var endpoint in _clientEndpoints.Values) { - await endpoint.DisposeAsync(); + await endpoint.DisposeAsync().ConfigureAwait(false); } _serverEndpoints.Clear(); diff --git a/src/Proto.Remote/Endpoints/EndpointReader.cs b/src/Proto.Remote/Endpoints/EndpointReader.cs index 94a1b458b6..602d3484b5 100644 --- a/src/Proto.Remote/Endpoints/EndpointReader.cs +++ b/src/Proto.Remote/Endpoints/EndpointReader.cs @@ -42,6 +42,8 @@ ServerCallContext context throw new RpcException(Status.DefaultCancelled, "Suspended"); } + var cancellationTokenSource = new CancellationTokenSource(); + async void Disconnect() { try @@ -60,6 +62,13 @@ async void Disconnect() Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address); } + finally + { + // When we disconnect, cancel the token, so the reader and writer both stop, and this method returns, + // so that the stream actually closes. Without this, when kestrel begins shutdown, it's possible the + // connection will stay open until the kestrel shutdown timeout is reached. + cancellationTokenSource.Cancel(); + } } await using (_endpointManager.CancellationToken.Register(Disconnect).ConfigureAwait(false)) @@ -81,8 +90,6 @@ async void Disconnect() var connectRequest = requestStream.Current.ConnectRequest; - var cancellationTokenSource = new CancellationTokenSource(); - switch (connectRequest.ConnectionTypeCase) { case ConnectRequest.ConnectionTypeOneofCase.ClientConnection: @@ -205,7 +212,7 @@ private async Task RunReader(IAsyncStreamReader requestStream, st { try { - while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false)) + while (await requestStream.MoveNext(cancellationTokenSource.Token).ConfigureAwait(false)) { var currentMessage = requestStream.Current; From 453a7d65249789bb9a0aff27fe5d49f6ee40cdb4 Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Tue, 5 Nov 2024 00:57:05 -0600 Subject: [PATCH 6/7] increase timeout on a couple tests so they fail less often --- tests/Proto.Actor.Tests/DisposableActorTests.cs | 2 +- tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Proto.Actor.Tests/DisposableActorTests.cs b/tests/Proto.Actor.Tests/DisposableActorTests.cs index aa3a41b14e..6d16d43c8e 100644 --- a/tests/Proto.Actor.Tests/DisposableActorTests.cs +++ b/tests/Proto.Actor.Tests/DisposableActorTests.cs @@ -54,7 +54,7 @@ public async Task WhenActorRestarted_DisposeAsyncIsCalled() var parent = context.Spawn(props); context.Send(parent, "crash"); - childMailboxStats.Reset.Wait(1000); + childMailboxStats.Reset.Wait(2000); Assert.True(disposeCalled); } diff --git a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs index 46b4a7415c..7b003a6b87 100644 --- a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs +++ b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs @@ -79,7 +79,7 @@ public async Task OneForOneStrategy_Should_RestartChildOnFailure() context.Send(parent, "hello"); - childMailboxStats.Reset.Wait(1000); + childMailboxStats.Reset.Wait(2000); Assert.Contains(childMailboxStats.Posted, msg => msg is Restart); Assert.Contains(childMailboxStats.Received, msg => msg is Restart); } From bab930c38ca0ba71c4a3e93a93e2295cce8f607c Mon Sep 17 00:00:00 2001 From: benbenwilde Date: Tue, 5 Nov 2024 01:00:35 -0600 Subject: [PATCH 7/7] increase another timeout on flakey test --- tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs index 7b003a6b87..2fa4323801 100644 --- a/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs +++ b/tests/Proto.Actor.Tests/SupervisionTests_OneForOne.cs @@ -147,7 +147,7 @@ public async Task context.Send(parent, "3rd restart"); context.Send(parent, "4th restart"); - childMailboxStats.Reset.Wait(1000); + childMailboxStats.Reset.Wait(2000); Assert.Contains(Stop.Instance, childMailboxStats.Posted); Assert.Contains(Stop.Instance, childMailboxStats.Received); }