Skip to content

Commit

Permalink
Fixed cluster client mode bug (#2072)
Browse files Browse the repository at this point in the history
* Fixed issue with client cluster logic where it would not see itself as started unless listed by the cluster provider. Changed member list to accept any cluster topology updates when in client mode and set started.

Added tests for client mode requests.

* Tolerate non-completed JoinedCluster task when using StartClientAsync

* Don't test for client functionality when unsupported by provider
  • Loading branch information
mhelleborg authored Nov 16, 2023
1 parent b06ae68 commit 30e6aab
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client)
await Remote.StartAsync().ConfigureAwait(false);

Logger.LogInformation("Starting");
MemberList = new MemberList(this);
MemberList = new MemberList(this, client);
_ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true));
ClusterContext = Config.ClusterContextProducer(this);

Expand Down
21 changes: 12 additions & 9 deletions src/Proto.Cluster/Member/MemberList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public record MemberList

private TaskCompletionSource<bool> _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private IConsensusHandle<ulong>? _topologyConsensus;
private readonly bool _asClient;

public MemberList(Cluster cluster)
public MemberList(Cluster cluster, bool asClient = false)
{
_cluster = cluster;
_system = _cluster.System;
_root = _system.Root;
_asClient = asClient;
var (host, port) = _cluster.System.GetAddress();

Self = new Member
Expand Down Expand Up @@ -99,9 +101,10 @@ public MemberList(Cluster cluster)
{
SelfBlocked();
}

//only log if the member is known to us
if (TryGetMember(b.MemberId, out _)) {
if (TryGetMember(b.MemberId, out _))
{
Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason);
}

Expand Down Expand Up @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection<Member> members)

if (!_startedTcs.Task.IsCompleted)
{
if (activeMembers.Contains(_system.Id))
if (_asClient || activeMembers.Contains(_system.Id))
{
_startedTcs.TrySetResult(true);
}
Expand Down Expand Up @@ -282,23 +285,23 @@ void MemberJoin(Member newMember)
Logger.LogError("Member {Member} already exists in MemberList", newMember);
return;
}

var index = _nextMemberIndex++;
_metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index));
_membersByIndex = _membersByIndex.SetItem(index, newMember);
_indexByAddress = _indexByAddress.SetItem(newMember.Address, index);

foreach (var kind in newMember.Kinds)
{
if (!_memberStrategyByKind.ContainsKey(kind))
{
_memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind));
}

_memberStrategyByKind[kind].AddMember(newMember);
}
}
catch(Exception x)
catch (Exception x)
{
Logger.LogError(x, "Error during MemberJoin {Member}", newMember);
}
Expand Down Expand Up @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) =>
/// <returns></returns>
public Member[] GetMembersByKind(string kind) =>
_activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ CancellationToken cancellationToken
{
_output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member");

_ = clusterFixture.SpawnNode()
_ = clusterFixture.SpawnMember()
.ContinueWith(
t =>
{
Expand Down
91 changes: 72 additions & 19 deletions tests/Proto.Cluster.Tests/ClusterFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ public interface IClusterFixture
{
IList<Cluster> Members { get; }

bool SupportsClients { get; }

LogStore LogStore { get; }
int ClusterSize { get; }

public Task<Cluster> SpawnNode();
public Task<Cluster> SpawnMember();
public Task<Cluster> SpawnClient();

Task RemoveNode(Cluster member, bool graceful = true);

Expand All @@ -55,11 +58,12 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi
{
private static readonly object Lock = new();


public const string InvalidIdentity = "invalid";
private readonly Func<ClusterConfig, ClusterConfig>? _configure;
private readonly ILogger _logger = Log.CreateLogger(nameof(GetType));
private readonly List<Cluster> _members = new();
private readonly List<Cluster> _clients = new();
private static TracerProvider? _tracerProvider;
private GithubActionsReporter _reporter;

Expand All @@ -69,14 +73,14 @@ static ClusterFixture()
{
TracingSettings.OpenTelemetryUrl = Environment.GetEnvironmentVariable("OPENTELEMETRY_URL");
TracingSettings.TraceViewUrl = Environment.GetEnvironmentVariable("TRACEVIEW_URL");
// TracingSettings.OpenTelemetryUrl = "http://Localhost:4317";
// TracingSettings.OpenTelemetryUrl = "http://Localhost:4317";
TracingSettings.EnableTracing = TracingSettings.OpenTelemetryUrl != null;

//TODO: check if this helps low resource envs like github actions.
ThreadPool.SetMinThreads(40, 40);
}

protected ClusterFixture( int clusterSize, Func<ClusterConfig, ClusterConfig>? configure = null)
protected ClusterFixture(int clusterSize, Func<ClusterConfig, ClusterConfig>? configure = null)
{
_reporter = new GithubActionsReporter(GetType().Name);
ClusterSize = clusterSize;
Expand All @@ -87,11 +91,14 @@ protected ClusterFixture( int clusterSize, Func<ClusterConfig, ClusterConfig>? c
// ReSharper disable once HeuristicUnreachableCode
if (TracingSettings.EnableTracing)
{
InitOpenTelemetryTracing();
InitOpenTelemetryTracing();
}
#pragma warning restore CS0162
}

public virtual bool SupportsClients => true;


protected virtual ClusterKind[] ClusterKinds => new[]
{
new ClusterKind(EchoActor.Kind, EchoActor.Props.WithClusterRequestDeduplication()),
Expand Down Expand Up @@ -122,7 +129,7 @@ public async Task DisposeAsync()
try
{
_tracerProvider?.ForceFlush();

await _reporter.WriteReportFile();

await OnDisposing();
Expand Down Expand Up @@ -154,11 +161,10 @@ private async Task WaitForMembersToShutdown()
_logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id);

var done = await task.WaitUpTo(TimeSpan.FromSeconds(5));
if (! done)
if (!done)
{
_logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id);
}

}
catch (Exception e)
{
Expand All @@ -178,6 +184,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true)
Members.Remove(member);
await member.ShutdownAsync(graceful, "Stopped by ClusterFixture");
}
else if (Clients.Contains(member))
{
Clients.Remove(member);
await member.ShutdownAsync(graceful, "Stopped by ClusterFixture");
}
else
{
throw new ArgumentException("No such member");
Expand All @@ -194,15 +205,29 @@ public Task Trace(Func<Task> test, [CallerMemberName] string testName = "")
/// </summary>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public async Task<Cluster> SpawnNode()
public async Task<Cluster> SpawnMember()
{
var newMember = await SpawnClusterMember(_configure);
Members.Add(newMember);
_members.Add(newMember);

return newMember;
}

/// <summary>
/// Spawns a node, adds it to the cluster and member list
/// </summary>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public async Task<Cluster> SpawnClient()
{
var newMember = await SpawnClusterClient(_configure);
_clients.Add(newMember);

return newMember;
}

public IList<Cluster> Members => _members;
public IList<Cluster> Clients => _clients;

private static void InitOpenTelemetryTracing()
{
Expand All @@ -224,7 +249,7 @@ private static void InitOpenTelemetryTracing()
{
options
.SetResourceBuilder(builder)
.AddOtlpExporter( (OtlpExporterOptions o) =>
.AddOtlpExporter((OtlpExporterOptions o) =>
{
o.Endpoint = endpoint;
o.ExportProcessorType = ExportProcessorType.Batch;
Expand Down Expand Up @@ -262,10 +287,10 @@ private async Task<IList<Cluster>> SpawnClusterNodes(
{
var tasks = Enumerable.Range(0, count)
.Select(_ => SpawnClusterMember(configure));

var res = (await Task.WhenAll(tasks)).ToList();


var consensus = res.Select(m => m.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10)));
var x = await Task.WhenAll(consensus);
if (x.Any(c => !c.consensus))
Expand Down Expand Up @@ -293,10 +318,7 @@ protected virtual async Task<Cluster> SpawnClusterMember(Func<ClusterConfig, Clu

var logger = system.Logger()?.BeginScope<EventStream>();

system.EventStream.Subscribe<object>(e =>
{
logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e);
}
system.EventStream.Subscribe<object>(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); }
);

var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor);
Expand All @@ -309,6 +331,35 @@ protected virtual async Task<Cluster> SpawnClusterMember(Func<ClusterConfig, Clu
return cluster;
}

protected virtual async Task<Cluster> SpawnClusterClient(Func<ClusterConfig, ClusterConfig>? configure)
{
var config = ClusterConfig.Setup(
ClusterName,
GetClusterProvider(),
GetIdentityLookup(ClusterName)
)
.WithHeartbeatExpiration(TimeSpan.Zero);

config = configure?.Invoke(config) ?? config;

var system = new ActorSystem(GetActorSystemConfig());
system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id));

var logger = system.Logger()?.BeginScope<EventStream>();

system.EventStream.Subscribe<object>(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); }
);

var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor);
var _ = new GrpcNetRemote(system, remoteConfig);

var cluster = new Cluster(system, config);

await cluster.StartClientAsync();

return cluster;
}

protected virtual ActorSystemConfig GetActorSystemConfig()
{
var actorSystemConfig = ActorSystemConfig.Setup();
Expand All @@ -317,7 +368,7 @@ protected virtual ActorSystemConfig GetActorSystemConfig()
return TracingSettings.EnableTracing
? actorSystemConfig
.WithConfigureProps(props => props.WithTracing().WithLoggingContextDecorator(_logger).WithLoggingContextDecorator(_logger))
.WithConfigureSystemProps((name,props) =>
.WithConfigureSystemProps((name, props) =>
{
// if (name == "$gossip")
// return props;
Expand Down Expand Up @@ -427,4 +478,6 @@ public SingleNodeProviderFixture() : base(1, config => config.WithActorRequestTi
protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider();

protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup();

public override bool SupportsClients => false;
}
45 changes: 37 additions & 8 deletions tests/Proto.Cluster.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ protected ClusterTests(ITestOutputHelper testOutputHelper, IClusterFixture clust
[Fact]
public void ClusterMembersMatch()
{

var memberSet = Members.First().MemberList.GetMembers();

memberSet.Should().NotBeEmpty();
Expand All @@ -51,16 +50,46 @@ await Trace(async () =>
_testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}");
}, _testOutputHelper);
}


[Fact]
public async Task ClientsCanCallCluster()
{
if (!ClusterFixture.SupportsClients)
return;

await Trace(async () =>
{
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;

var clientNode = await ClusterFixture.SpawnClient();

try
{
await clientNode.JoinedCluster.WaitAsync(timeout);
clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue();

var timer = Stopwatch.StartNew();
await PingPong(clientNode, "client-unicorn", timeout);
timer.Stop();
_testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}");
}
catch
{
await ClusterFixture.RemoveNode(clientNode);
throw;
}
}, _testOutputHelper);
}

[Fact]
public async Task TopologiesShouldHaveConsensus()
{
await Trace(async () =>
{
var consensus = await Task
.WhenAll(Members.Select(member =>
member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20))))
.WaitUpTo(TimeSpan.FromSeconds(20))
.WhenAll(Members.Select(member =>
member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20))))
.WaitUpTo(TimeSpan.FromSeconds(20))
;

_testOutputHelper.WriteLine(await Members.DumpClusterState());
Expand Down Expand Up @@ -217,7 +246,7 @@ await Trace(async () =>
_testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address);
await ClusterFixture.RemoveNode(toBeRemoved);
_testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address);
await ClusterFixture.SpawnNode();
await ClusterFixture.SpawnMember();

await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000);

Expand Down Expand Up @@ -256,7 +285,7 @@ await Trace(async () =>
_testOutputHelper.WriteLine("Terminating node");
await ClusterFixture.RemoveNode(victim);
_testOutputHelper.WriteLine("Spawning node");
await ClusterFixture.SpawnNode();
await ClusterFixture.SpawnMember();
await Task.Delay(1000);
cts.Cancel();
await worker;
Expand Down Expand Up @@ -468,7 +497,7 @@ private async Task PingPong(
string id,
CancellationToken token = default,
string kind = EchoActor.Kind,
ISenderContext context= null
ISenderContext context = null
)
{
await Task.Yield();
Expand Down
Loading

0 comments on commit 30e6aab

Please sign in to comment.