From 2e0acd13c05bffd54d2e84cd394e94844fe778be Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 15 Nov 2023 22:05:14 +0100 Subject: [PATCH 1/3] Fixed issue with client cluster logic where it would not see itself as started unless listed by the cluster provider. Changed member list to accept any cluster topology updates when in client mode and set started. Added tests for client mode requests. --- src/Proto.Cluster/Cluster.cs | 2 +- src/Proto.Cluster/Member/MemberList.cs | 21 ++++--- .../PartitionIdentityTests.cs | 2 +- tests/Proto.Cluster.Tests/ClusterFixture.cs | 61 +++++++++++++++++-- tests/Proto.Cluster.Tests/ClusterTests.cs | 31 +++++++++- tests/Proto.Cluster.Tests/GossipTests.cs | 2 +- .../RetryOnDeadLetterTests.cs | 2 +- 7 files changed, 102 insertions(+), 19 deletions(-) diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 83cf7d952d..688b7185e7 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client) await Remote.StartAsync().ConfigureAwait(false); Logger.LogInformation("Starting"); - MemberList = new MemberList(this); + MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); ClusterContext = Config.ClusterContextProducer(this); diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 87acf8e922..4e328fd836 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -60,12 +60,14 @@ public record MemberList private TaskCompletionSource _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private IConsensusHandle? _topologyConsensus; + private readonly bool _asClient; - public MemberList(Cluster cluster) + public MemberList(Cluster cluster, bool asClient = false) { _cluster = cluster; _system = _cluster.System; _root = _system.Root; + _asClient = asClient; var (host, port) = _cluster.System.GetAddress(); Self = new Member @@ -99,9 +101,10 @@ public MemberList(Cluster cluster) { SelfBlocked(); } - + //only log if the member is known to us - if (TryGetMember(b.MemberId, out _)) { + if (TryGetMember(b.MemberId, out _)) + { Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason); } @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection members) if (!_startedTcs.Task.IsCompleted) { - if (activeMembers.Contains(_system.Id)) + if (_asClient || activeMembers.Contains(_system.Id)) { _startedTcs.TrySetResult(true); } @@ -282,23 +285,23 @@ void MemberJoin(Member newMember) Logger.LogError("Member {Member} already exists in MemberList", newMember); return; } - + var index = _nextMemberIndex++; _metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index)); _membersByIndex = _membersByIndex.SetItem(index, newMember); _indexByAddress = _indexByAddress.SetItem(newMember.Address, index); - + foreach (var kind in newMember.Kinds) { if (!_memberStrategyByKind.ContainsKey(kind)) { _memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind)); } - + _memberStrategyByKind[kind].AddMember(newMember); } } - catch(Exception x) + catch (Exception x) { Logger.LogError(x, "Error during MemberJoin {Member}", newMember); } @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => /// public Member[] GetMembersByKind(string kind) => _activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray(); -} +} \ No newline at end of file diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs index d8ca8207f1..5cb0bc0d66 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs @@ -209,7 +209,7 @@ CancellationToken cancellationToken { _output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member"); - _ = clusterFixture.SpawnNode() + _ = clusterFixture.SpawnMember() .ContinueWith( t => { diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index d4c0ef7141..6f8ff65dbe 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -37,7 +37,8 @@ public interface IClusterFixture LogStore LogStore { get; } int ClusterSize { get; } - public Task SpawnNode(); + public Task SpawnMember(); + public Task SpawnClient(); Task RemoveNode(Cluster member, bool graceful = true); @@ -60,6 +61,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly List _members = new(); + private readonly List _clients = new(); private static TracerProvider? _tracerProvider; private GithubActionsReporter _reporter; @@ -178,6 +180,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true) Members.Remove(member); await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); } + else if (Clients.Contains(member)) + { + Clients.Remove(member); + await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); + } else { throw new ArgumentException("No such member"); @@ -194,15 +201,29 @@ public Task Trace(Func test, [CallerMemberName] string testName = "") /// /// /// - public async Task SpawnNode() + public async Task SpawnMember() { var newMember = await SpawnClusterMember(_configure); - Members.Add(newMember); + _members.Add(newMember); + + return newMember; + } + + /// + /// Spawns a node, adds it to the cluster and member list + /// + /// + /// + public async Task SpawnClient() + { + var newMember = await SpawnClusterClient(_configure); + _clients.Add(newMember); return newMember; } public IList Members => _members; + public IList Clients => _clients; private static void InitOpenTelemetryTracing() { @@ -303,11 +324,43 @@ protected virtual async Task SpawnClusterMember(Func SpawnClusterClient(Func? configure) + { + var config = ClusterConfig.Setup( + ClusterName, + GetClusterProvider(), + GetIdentityLookup(ClusterName) + ) + .WithHeartbeatExpiration(TimeSpan.Zero); + + config = configure?.Invoke(config) ?? config; + + var system = new ActorSystem(GetActorSystemConfig()); + system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id)); + + var logger = system.Logger()?.BeginScope(); + + system.EventStream.Subscribe(e => + { + logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); + } + ); + + var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); + var _ = new GrpcNetRemote(system, remoteConfig); + + var cluster = new Cluster(system, config); + + await cluster.StartClientAsync(); + + return cluster; + } protected virtual ActorSystemConfig GetActorSystemConfig() { diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index f640f25e20..41241c3ea0 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -52,6 +52,33 @@ await Trace(async () => }, _testOutputHelper); } + [Fact] + public async Task ClientsCanCallCluster() + { + await Trace(async () => + { + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + + var clientNode = await ClusterFixture.SpawnClient(); + + try + { + clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); + + var timer = Stopwatch.StartNew(); + await PingPong(clientNode, "client-unicorn", timeout); + timer.Stop(); + _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); + } + catch + { + await ClusterFixture.RemoveNode(clientNode); + throw; + } + + }, _testOutputHelper); + } + [Fact] public async Task TopologiesShouldHaveConsensus() { @@ -217,7 +244,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); await ClusterFixture.RemoveNode(toBeRemoved); _testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -256,7 +283,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Terminating node"); await ClusterFixture.RemoveNode(victim); _testOutputHelper.WriteLine("Spawning node"); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await Task.Delay(1000); cts.Cancel(); await worker; diff --git a/tests/Proto.Cluster.Tests/GossipTests.cs b/tests/Proto.Cluster.Tests/GossipTests.cs index 6dbb495b1b..33bffb7f04 100644 --- a/tests/Proto.Cluster.Tests/GossipTests.cs +++ b/tests/Proto.Cluster.Tests/GossipTests.cs @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks() afterSettingMatchingState.value.Should().Be(initialTopologyHash); - await clusterFixture.SpawnNode(); + await clusterFixture.SpawnMember(); await Task.Delay(2000); // Allow topology state to propagate var afterChangingTopology = diff --git a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs index 6760d7e942..fb9f47dd57 100644 --- a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs +++ b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType await member.RequestAsync(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1)); // pretend we have an invalid PID in the cache - var otherMember = await fixture.SpawnNode(); + var otherMember = await fixture.SpawnMember(); if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid)) { var newPid = PID.FromAddress(otherMember.System.Address, pid.Id); From c504efb081712cc422d6ff7018fa9c76ac907468 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 15 Nov 2023 22:15:50 +0100 Subject: [PATCH 2/3] Tolerate non-completed JoinedCluster task when using StartClientAsync --- tests/Proto.Cluster.Tests/ClusterTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index 41241c3ea0..8e694ec48e 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -63,6 +63,7 @@ await Trace(async () => try { + await clientNode.JoinedCluster.WaitAsync(timeout); clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); var timer = Stopwatch.StartNew(); From 6b6b75872ec73f3ce6a7aef8ff0cd1b0eeffd5d6 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 15 Nov 2023 22:31:09 +0100 Subject: [PATCH 3/3] Don't test for client functionality when unsupported by provider --- tests/Proto.Cluster.Tests/ClusterFixture.cs | 48 ++++++++++----------- tests/Proto.Cluster.Tests/ClusterTests.cs | 21 ++++----- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index 6f8ff65dbe..05b184a516 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -34,6 +34,8 @@ public interface IClusterFixture { IList Members { get; } + bool SupportsClients { get; } + LogStore LogStore { get; } int ClusterSize { get; } @@ -56,7 +58,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi { private static readonly object Lock = new(); - + public const string InvalidIdentity = "invalid"; private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); @@ -71,14 +73,14 @@ static ClusterFixture() { TracingSettings.OpenTelemetryUrl = Environment.GetEnvironmentVariable("OPENTELEMETRY_URL"); TracingSettings.TraceViewUrl = Environment.GetEnvironmentVariable("TRACEVIEW_URL"); - // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; + // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; TracingSettings.EnableTracing = TracingSettings.OpenTelemetryUrl != null; //TODO: check if this helps low resource envs like github actions. ThreadPool.SetMinThreads(40, 40); } - protected ClusterFixture( int clusterSize, Func? configure = null) + protected ClusterFixture(int clusterSize, Func? configure = null) { _reporter = new GithubActionsReporter(GetType().Name); ClusterSize = clusterSize; @@ -89,11 +91,14 @@ protected ClusterFixture( int clusterSize, Func? c // ReSharper disable once HeuristicUnreachableCode if (TracingSettings.EnableTracing) { - InitOpenTelemetryTracing(); + InitOpenTelemetryTracing(); } #pragma warning restore CS0162 } + public virtual bool SupportsClients => true; + + protected virtual ClusterKind[] ClusterKinds => new[] { new ClusterKind(EchoActor.Kind, EchoActor.Props.WithClusterRequestDeduplication()), @@ -124,7 +129,7 @@ public async Task DisposeAsync() try { _tracerProvider?.ForceFlush(); - + await _reporter.WriteReportFile(); await OnDisposing(); @@ -156,11 +161,10 @@ private async Task WaitForMembersToShutdown() _logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id); var done = await task.WaitUpTo(TimeSpan.FromSeconds(5)); - if (! done) + if (!done) { _logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id); } - } catch (Exception e) { @@ -208,7 +212,7 @@ public async Task SpawnMember() return newMember; } - + /// /// Spawns a node, adds it to the cluster and member list /// @@ -245,7 +249,7 @@ private static void InitOpenTelemetryTracing() { options .SetResourceBuilder(builder) - .AddOtlpExporter( (OtlpExporterOptions o) => + .AddOtlpExporter((OtlpExporterOptions o) => { o.Endpoint = endpoint; o.ExportProcessorType = ExportProcessorType.Batch; @@ -283,10 +287,10 @@ private async Task> SpawnClusterNodes( { var tasks = Enumerable.Range(0, count) .Select(_ => SpawnClusterMember(configure)); - + var res = (await Task.WhenAll(tasks)).ToList(); - - + + var consensus = res.Select(m => m.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10))); var x = await Task.WhenAll(consensus); if (x.Any(c => !c.consensus)) @@ -314,22 +318,19 @@ protected virtual async Task SpawnClusterMember(Func(); - system.EventStream.Subscribe(e => - { - logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); - } + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } ); var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); var _ = new GrpcNetRemote(system, remoteConfig); var cluster = new Cluster(system, config); - + await cluster.StartMemberAsync(); return cluster; } - + protected virtual async Task SpawnClusterClient(Func? configure) { var config = ClusterConfig.Setup( @@ -346,17 +347,14 @@ protected virtual async Task SpawnClusterClient(Func(); - system.EventStream.Subscribe(e => - { - logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); - } + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } ); var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); var _ = new GrpcNetRemote(system, remoteConfig); var cluster = new Cluster(system, config); - + await cluster.StartClientAsync(); return cluster; @@ -370,7 +368,7 @@ protected virtual ActorSystemConfig GetActorSystemConfig() return TracingSettings.EnableTracing ? actorSystemConfig .WithConfigureProps(props => props.WithTracing().WithLoggingContextDecorator(_logger).WithLoggingContextDecorator(_logger)) - .WithConfigureSystemProps((name,props) => + .WithConfigureSystemProps((name, props) => { // if (name == "$gossip") // return props; @@ -480,4 +478,6 @@ public SingleNodeProviderFixture() : base(1, config => config.WithActorRequestTi protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider(); protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup(); + + public override bool SupportsClients => false; } \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index 8e694ec48e..76d1cb9032 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -28,7 +28,6 @@ protected ClusterTests(ITestOutputHelper testOutputHelper, IClusterFixture clust [Fact] public void ClusterMembersMatch() { - var memberSet = Members.First().MemberList.GetMembers(); memberSet.Should().NotBeEmpty(); @@ -51,21 +50,24 @@ await Trace(async () => _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); }, _testOutputHelper); } - + [Fact] public async Task ClientsCanCallCluster() { + if (!ClusterFixture.SupportsClients) + return; + await Trace(async () => { var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; - var clientNode = await ClusterFixture.SpawnClient(); + var clientNode = await ClusterFixture.SpawnClient(); try { await clientNode.JoinedCluster.WaitAsync(timeout); clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); - + var timer = Stopwatch.StartNew(); await PingPong(clientNode, "client-unicorn", timeout); timer.Stop(); @@ -76,19 +78,18 @@ await Trace(async () => await ClusterFixture.RemoveNode(clientNode); throw; } - }, _testOutputHelper); } - + [Fact] public async Task TopologiesShouldHaveConsensus() { await Trace(async () => { var consensus = await Task - .WhenAll(Members.Select(member => - member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) - .WaitUpTo(TimeSpan.FromSeconds(20)) + .WhenAll(Members.Select(member => + member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) + .WaitUpTo(TimeSpan.FromSeconds(20)) ; _testOutputHelper.WriteLine(await Members.DumpClusterState()); @@ -496,7 +497,7 @@ private async Task PingPong( string id, CancellationToken token = default, string kind = EchoActor.Kind, - ISenderContext context= null + ISenderContext context = null ) { await Task.Yield();