From 30e6aaba309154039fb738d75b4096144bcd3927 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Thu, 16 Nov 2023 13:27:59 +0100 Subject: [PATCH] Fixed cluster client mode bug (#2072) * Fixed issue with client cluster logic where it would not see itself as started unless listed by the cluster provider. Changed member list to accept any cluster topology updates when in client mode and set started. Added tests for client mode requests. * Tolerate non-completed JoinedCluster task when using StartClientAsync * Don't test for client functionality when unsupported by provider --- src/Proto.Cluster/Cluster.cs | 2 +- src/Proto.Cluster/Member/MemberList.cs | 21 +++-- .../PartitionIdentityTests.cs | 2 +- tests/Proto.Cluster.Tests/ClusterFixture.cs | 91 +++++++++++++++---- tests/Proto.Cluster.Tests/ClusterTests.cs | 45 +++++++-- tests/Proto.Cluster.Tests/GossipTests.cs | 2 +- .../RetryOnDeadLetterTests.cs | 2 +- 7 files changed, 125 insertions(+), 40 deletions(-) diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 83cf7d952d..688b7185e7 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client) await Remote.StartAsync().ConfigureAwait(false); Logger.LogInformation("Starting"); - MemberList = new MemberList(this); + MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); ClusterContext = Config.ClusterContextProducer(this); diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 87acf8e922..4e328fd836 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -60,12 +60,14 @@ public record MemberList private TaskCompletionSource _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private IConsensusHandle? _topologyConsensus; + private readonly bool _asClient; - public MemberList(Cluster cluster) + public MemberList(Cluster cluster, bool asClient = false) { _cluster = cluster; _system = _cluster.System; _root = _system.Root; + _asClient = asClient; var (host, port) = _cluster.System.GetAddress(); Self = new Member @@ -99,9 +101,10 @@ public MemberList(Cluster cluster) { SelfBlocked(); } - + //only log if the member is known to us - if (TryGetMember(b.MemberId, out _)) { + if (TryGetMember(b.MemberId, out _)) + { Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason); } @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection members) if (!_startedTcs.Task.IsCompleted) { - if (activeMembers.Contains(_system.Id)) + if (_asClient || activeMembers.Contains(_system.Id)) { _startedTcs.TrySetResult(true); } @@ -282,23 +285,23 @@ void MemberJoin(Member newMember) Logger.LogError("Member {Member} already exists in MemberList", newMember); return; } - + var index = _nextMemberIndex++; _metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index)); _membersByIndex = _membersByIndex.SetItem(index, newMember); _indexByAddress = _indexByAddress.SetItem(newMember.Address, index); - + foreach (var kind in newMember.Kinds) { if (!_memberStrategyByKind.ContainsKey(kind)) { _memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind)); } - + _memberStrategyByKind[kind].AddMember(newMember); } } - catch(Exception x) + catch (Exception x) { Logger.LogError(x, "Error during MemberJoin {Member}", newMember); } @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => /// public Member[] GetMembersByKind(string kind) => _activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray(); -} +} \ No newline at end of file diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs index d8ca8207f1..5cb0bc0d66 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs @@ -209,7 +209,7 @@ CancellationToken cancellationToken { _output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member"); - _ = clusterFixture.SpawnNode() + _ = clusterFixture.SpawnMember() .ContinueWith( t => { diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index d4c0ef7141..05b184a516 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -34,10 +34,13 @@ public interface IClusterFixture { IList Members { get; } + bool SupportsClients { get; } + LogStore LogStore { get; } int ClusterSize { get; } - public Task SpawnNode(); + public Task SpawnMember(); + public Task SpawnClient(); Task RemoveNode(Cluster member, bool graceful = true); @@ -55,11 +58,12 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi { private static readonly object Lock = new(); - + public const string InvalidIdentity = "invalid"; private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly List _members = new(); + private readonly List _clients = new(); private static TracerProvider? _tracerProvider; private GithubActionsReporter _reporter; @@ -69,14 +73,14 @@ static ClusterFixture() { TracingSettings.OpenTelemetryUrl = Environment.GetEnvironmentVariable("OPENTELEMETRY_URL"); TracingSettings.TraceViewUrl = Environment.GetEnvironmentVariable("TRACEVIEW_URL"); - // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; + // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; TracingSettings.EnableTracing = TracingSettings.OpenTelemetryUrl != null; //TODO: check if this helps low resource envs like github actions. ThreadPool.SetMinThreads(40, 40); } - protected ClusterFixture( int clusterSize, Func? configure = null) + protected ClusterFixture(int clusterSize, Func? configure = null) { _reporter = new GithubActionsReporter(GetType().Name); ClusterSize = clusterSize; @@ -87,11 +91,14 @@ protected ClusterFixture( int clusterSize, Func? c // ReSharper disable once HeuristicUnreachableCode if (TracingSettings.EnableTracing) { - InitOpenTelemetryTracing(); + InitOpenTelemetryTracing(); } #pragma warning restore CS0162 } + public virtual bool SupportsClients => true; + + protected virtual ClusterKind[] ClusterKinds => new[] { new ClusterKind(EchoActor.Kind, EchoActor.Props.WithClusterRequestDeduplication()), @@ -122,7 +129,7 @@ public async Task DisposeAsync() try { _tracerProvider?.ForceFlush(); - + await _reporter.WriteReportFile(); await OnDisposing(); @@ -154,11 +161,10 @@ private async Task WaitForMembersToShutdown() _logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id); var done = await task.WaitUpTo(TimeSpan.FromSeconds(5)); - if (! done) + if (!done) { _logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id); } - } catch (Exception e) { @@ -178,6 +184,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true) Members.Remove(member); await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); } + else if (Clients.Contains(member)) + { + Clients.Remove(member); + await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); + } else { throw new ArgumentException("No such member"); @@ -194,15 +205,29 @@ public Task Trace(Func test, [CallerMemberName] string testName = "") /// /// /// - public async Task SpawnNode() + public async Task SpawnMember() { var newMember = await SpawnClusterMember(_configure); - Members.Add(newMember); + _members.Add(newMember); + + return newMember; + } + + /// + /// Spawns a node, adds it to the cluster and member list + /// + /// + /// + public async Task SpawnClient() + { + var newMember = await SpawnClusterClient(_configure); + _clients.Add(newMember); return newMember; } public IList Members => _members; + public IList Clients => _clients; private static void InitOpenTelemetryTracing() { @@ -224,7 +249,7 @@ private static void InitOpenTelemetryTracing() { options .SetResourceBuilder(builder) - .AddOtlpExporter( (OtlpExporterOptions o) => + .AddOtlpExporter((OtlpExporterOptions o) => { o.Endpoint = endpoint; o.ExportProcessorType = ExportProcessorType.Batch; @@ -262,10 +287,10 @@ private async Task> SpawnClusterNodes( { var tasks = Enumerable.Range(0, count) .Select(_ => SpawnClusterMember(configure)); - + var res = (await Task.WhenAll(tasks)).ToList(); - - + + var consensus = res.Select(m => m.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10))); var x = await Task.WhenAll(consensus); if (x.Any(c => !c.consensus)) @@ -293,10 +318,7 @@ protected virtual async Task SpawnClusterMember(Func(); - system.EventStream.Subscribe(e => - { - logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); - } + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } ); var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); @@ -309,6 +331,35 @@ protected virtual async Task SpawnClusterMember(Func SpawnClusterClient(Func? configure) + { + var config = ClusterConfig.Setup( + ClusterName, + GetClusterProvider(), + GetIdentityLookup(ClusterName) + ) + .WithHeartbeatExpiration(TimeSpan.Zero); + + config = configure?.Invoke(config) ?? config; + + var system = new ActorSystem(GetActorSystemConfig()); + system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id)); + + var logger = system.Logger()?.BeginScope(); + + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } + ); + + var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); + var _ = new GrpcNetRemote(system, remoteConfig); + + var cluster = new Cluster(system, config); + + await cluster.StartClientAsync(); + + return cluster; + } + protected virtual ActorSystemConfig GetActorSystemConfig() { var actorSystemConfig = ActorSystemConfig.Setup(); @@ -317,7 +368,7 @@ protected virtual ActorSystemConfig GetActorSystemConfig() return TracingSettings.EnableTracing ? actorSystemConfig .WithConfigureProps(props => props.WithTracing().WithLoggingContextDecorator(_logger).WithLoggingContextDecorator(_logger)) - .WithConfigureSystemProps((name,props) => + .WithConfigureSystemProps((name, props) => { // if (name == "$gossip") // return props; @@ -427,4 +478,6 @@ public SingleNodeProviderFixture() : base(1, config => config.WithActorRequestTi protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider(); protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup(); + + public override bool SupportsClients => false; } \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index f640f25e20..76d1cb9032 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -28,7 +28,6 @@ protected ClusterTests(ITestOutputHelper testOutputHelper, IClusterFixture clust [Fact] public void ClusterMembersMatch() { - var memberSet = Members.First().MemberList.GetMembers(); memberSet.Should().NotBeEmpty(); @@ -51,16 +50,46 @@ await Trace(async () => _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); }, _testOutputHelper); } - + + [Fact] + public async Task ClientsCanCallCluster() + { + if (!ClusterFixture.SupportsClients) + return; + + await Trace(async () => + { + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + + var clientNode = await ClusterFixture.SpawnClient(); + + try + { + await clientNode.JoinedCluster.WaitAsync(timeout); + clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); + + var timer = Stopwatch.StartNew(); + await PingPong(clientNode, "client-unicorn", timeout); + timer.Stop(); + _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); + } + catch + { + await ClusterFixture.RemoveNode(clientNode); + throw; + } + }, _testOutputHelper); + } + [Fact] public async Task TopologiesShouldHaveConsensus() { await Trace(async () => { var consensus = await Task - .WhenAll(Members.Select(member => - member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) - .WaitUpTo(TimeSpan.FromSeconds(20)) + .WhenAll(Members.Select(member => + member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) + .WaitUpTo(TimeSpan.FromSeconds(20)) ; _testOutputHelper.WriteLine(await Members.DumpClusterState()); @@ -217,7 +246,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); await ClusterFixture.RemoveNode(toBeRemoved); _testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -256,7 +285,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Terminating node"); await ClusterFixture.RemoveNode(victim); _testOutputHelper.WriteLine("Spawning node"); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await Task.Delay(1000); cts.Cancel(); await worker; @@ -468,7 +497,7 @@ private async Task PingPong( string id, CancellationToken token = default, string kind = EchoActor.Kind, - ISenderContext context= null + ISenderContext context = null ) { await Task.Yield(); diff --git a/tests/Proto.Cluster.Tests/GossipTests.cs b/tests/Proto.Cluster.Tests/GossipTests.cs index 6dbb495b1b..33bffb7f04 100644 --- a/tests/Proto.Cluster.Tests/GossipTests.cs +++ b/tests/Proto.Cluster.Tests/GossipTests.cs @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks() afterSettingMatchingState.value.Should().Be(initialTopologyHash); - await clusterFixture.SpawnNode(); + await clusterFixture.SpawnMember(); await Task.Delay(2000); // Allow topology state to propagate var afterChangingTopology = diff --git a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs index 6760d7e942..fb9f47dd57 100644 --- a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs +++ b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType await member.RequestAsync(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1)); // pretend we have an invalid PID in the cache - var otherMember = await fixture.SpawnNode(); + var otherMember = await fixture.SpawnMember(); if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid)) { var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);