diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index 0368efe0b7..8846ce2156 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -219,7 +219,7 @@ public EventStreamSubscription Subscribe( /// A new subscription that can be used to unsubscribe public EventStreamSubscription Subscribe(ISenderContext context, params PID[] pids) where TMsg : T { - var caller = pids.First().ToDiagnosticString(); + var caller = pids.First().ToDiagnosticString().Split("/").Last(); var sub = new EventStreamSubscription( this, Dispatchers.SynchronousDispatcher, diff --git a/src/Proto.Actor/Extensions.cs b/src/Proto.Actor/Extensions.cs index ac6683c878..4f26e6a739 100644 --- a/src/Proto.Actor/Extensions.cs +++ b/src/Proto.Actor/Extensions.cs @@ -6,8 +6,11 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; using JetBrains.Annotations; using Proto.Mailbox; +using Proto.Utils; namespace Proto; @@ -20,10 +23,19 @@ internal static void SendSystemMessage(this IEnumerable self, SystemMessage pid.SendSystemMessage(system, message); } } + + public static async Task StopMany(this IEnumerable self, IContext context) + { + foreach (var chunk in self.Chunk(20)) + { + var tasks = chunk.Select(context.StopAsync); + await Task.WhenAll(tasks).WaitUpTo(TimeSpan.FromSeconds(10)); + } + } [UsedImplicitly] public static void Deconstruct( - //DONT TOUCH THIS, it tries to deconstruct the deconstruct method... + //DON'T TOUCH THIS, it tries to deconstruct the deconstruct method... // ReSharper disable once UseDeconstructionOnParameter this KeyValuePair self, out TKey key, diff --git a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs index dce6bdc551..6be1212b6c 100644 --- a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs +++ b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -17,7 +18,9 @@ namespace Proto.Cluster.Identity; internal class IdentityStoragePlacementActor : IActor { private const int PersistenceRetries = 3; +#pragma warning disable CS0618 // Type or member is obsolete private static readonly ILogger Logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete //pid -> the actor that we have created here //kind -> the actor kind @@ -39,7 +42,7 @@ public Task ReceiveAsync(IContext context) => context.Message switch { Started => OnStarted(context), - Stopping _ => Stopping(), + Stopping _ => Stopping(context), Stopped _ => Stopped(), ActivationTerminating msg => OnActivationTerminating(context, msg), ActivationRequest msg => OnActivationRequest(context, msg), @@ -53,12 +56,11 @@ private Task OnStarted(IContext context) return Task.CompletedTask; } - private Task Stopping() + private async Task Stopping(IContext context) { Logger.LogInformation("Stopping placement actor"); _subscription?.Unsubscribe(); - - return Task.CompletedTask; + await _actors.Values.StopMany(context); } private Task Stopped() diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index cc544247b5..0efc0e14ff 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -18,7 +18,9 @@ namespace Proto.Cluster.Partition; internal class PartitionPlacementActor : IActor, IDisposable { +#pragma warning disable CS0618 // Type or member is obsolete private static readonly ILogger Logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete //pid -> the actor that we have created here //kind -> the actor kind @@ -40,6 +42,7 @@ public Task ReceiveAsync(IContext context) => context.Message switch { Started => OnStarted(context), + Stopping => OnStopping(context), ActivationTerminating msg => OnActivationTerminating(msg), IdentityHandoverRequest msg => OnIdentityHandoverRequest(context, msg), ClusterTopology msg => OnClusterTopology(context, msg), @@ -47,6 +50,12 @@ public Task ReceiveAsync(IContext context) => _ => Task.CompletedTask }; + private async Task OnStopping(IContext context) + { + var pids = _actors.Values; + await pids.StopMany(context); + } + public void Dispose() => _subscription?.Unsubscribe(); private Task OnClusterTopology(IContext context, ClusterTopology msg) diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 3251c2032b..33932e9963 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -46,6 +46,7 @@ public Task ReceiveAsync(IContext context) => context.Message switch { Started => OnStarted(context), + Stopping => OnStopping(context), ActivationRequest msg => OnActivationRequest(msg, context), ActivationTerminated msg => OnActivationTerminated(msg), ActivationTerminating msg => OnActivationTerminating(msg), @@ -53,6 +54,12 @@ public Task ReceiveAsync(IContext context) => _ => Task.CompletedTask }; + private async Task OnStopping(IContext context) + { + var pids = _actors.Values; + await pids.StopMany(context); + } + private Task OnStarted(IContext context) { var self = context.Self; diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs index 43762f72a3..12fc0e2446 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs @@ -122,5 +122,8 @@ public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient) return Task.CompletedTask; } - public Task ShutdownAsync() => _partitionManager.ShutdownAsync(); + public Task ShutdownAsync() + { + return _partitionManager.ShutdownAsync(); + } } \ No newline at end of file