Skip to content

Commit

Permalink
Graceful stop (#2121)
Browse files Browse the repository at this point in the history
* graceful stop of cluster actors
  • Loading branch information
rogeralsing authored May 3, 2024
1 parent d300571 commit e0444e6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params PID[] pids) where TMsg : T
{
var caller = pids.First().ToDiagnosticString();
var caller = pids.First().ToDiagnosticString().Split("/").Last();
var sub = new EventStreamSubscription<T>(
this,
Dispatchers.SynchronousDispatcher,
Expand Down
14 changes: 13 additions & 1 deletion src/Proto.Actor/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto.Mailbox;
using Proto.Utils;

namespace Proto;

Expand All @@ -20,10 +23,19 @@ internal static void SendSystemMessage(this IEnumerable<PID> self, SystemMessage
pid.SendSystemMessage(system, message);
}
}

public static async Task StopMany(this IEnumerable<PID> self, IContext context)
{
foreach (var chunk in self.Chunk(20))
{
var tasks = chunk.Select(context.StopAsync);
await Task.WhenAll(tasks).WaitUpTo(TimeSpan.FromSeconds(10));
}
}

[UsedImplicitly]
public static void Deconstruct<TKey, TValue>(
//DONT TOUCH THIS, it tries to deconstruct the deconstruct method...
//DON'T TOUCH THIS, it tries to deconstruct the deconstruct method...
// ReSharper disable once UseDeconstructionOnParameter
this KeyValuePair<TKey, TValue> self,
out TKey key,
Expand Down
10 changes: 6 additions & 4 deletions src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -17,7 +18,9 @@ namespace Proto.Cluster.Identity;
internal class IdentityStoragePlacementActor : IActor
{
private const int PersistenceRetries = 3;
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<IdentityStoragePlacementActor>();
#pragma warning restore CS0618 // Type or member is obsolete

//pid -> the actor that we have created here
//kind -> the actor kind
Expand All @@ -39,7 +42,7 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping _ => Stopping(),
Stopping _ => Stopping(context),
Stopped _ => Stopped(),
ActivationTerminating msg => OnActivationTerminating(context, msg),
ActivationRequest msg => OnActivationRequest(context, msg),
Expand All @@ -53,12 +56,11 @@ private Task OnStarted(IContext context)
return Task.CompletedTask;
}

private Task Stopping()
private async Task Stopping(IContext context)
{
Logger.LogInformation("Stopping placement actor");
_subscription?.Unsubscribe();

return Task.CompletedTask;
await _actors.Values.StopMany(context);
}

private Task Stopped()
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ namespace Proto.Cluster.Partition;

internal class PartitionPlacementActor : IActor, IDisposable
{
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<PartitionPlacementActor>();
#pragma warning restore CS0618 // Type or member is obsolete

//pid -> the actor that we have created here
//kind -> the actor kind
Expand All @@ -40,13 +42,20 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping => OnStopping(context),
ActivationTerminating msg => OnActivationTerminating(msg),
IdentityHandoverRequest msg => OnIdentityHandoverRequest(context, msg),
ClusterTopology msg => OnClusterTopology(context, msg),
ActivationRequest msg => OnActivationRequest(context, msg),
_ => Task.CompletedTask
};

private async Task OnStopping(IContext context)
{
var pids = _actors.Values;
await pids.StopMany(context);
}

public void Dispose() => _subscription?.Unsubscribe();

private Task OnClusterTopology(IContext context, ClusterTopology msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping => OnStopping(context),
ActivationRequest msg => OnActivationRequest(msg, context),
ActivationTerminated msg => OnActivationTerminated(msg),
ActivationTerminating msg => OnActivationTerminating(msg),
ClusterTopology msg => OnClusterTopology(msg, context),
_ => Task.CompletedTask
};

private async Task OnStopping(IContext context)
{
var pids = _actors.Values;
await pids.StopMany(context);
}

private Task OnStarted(IContext context)
{
var self = context.Self;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,8 @@ public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
return Task.CompletedTask;
}

public Task ShutdownAsync() => _partitionManager.ShutdownAsync();
public Task ShutdownAsync()
{
return _partitionManager.ShutdownAsync();
}
}

0 comments on commit e0444e6

Please sign in to comment.