From 2e0acd13c05bffd54d2e84cd394e94844fe778be Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 15 Nov 2023 22:05:14 +0100 Subject: [PATCH] Fixed issue with client cluster logic where it would not see itself as started unless listed by the cluster provider. Changed member list to accept any cluster topology updates when in client mode and set started. Added tests for client mode requests. --- src/Proto.Cluster/Cluster.cs | 2 +- src/Proto.Cluster/Member/MemberList.cs | 21 ++++--- .../PartitionIdentityTests.cs | 2 +- tests/Proto.Cluster.Tests/ClusterFixture.cs | 61 +++++++++++++++++-- tests/Proto.Cluster.Tests/ClusterTests.cs | 31 +++++++++- tests/Proto.Cluster.Tests/GossipTests.cs | 2 +- .../RetryOnDeadLetterTests.cs | 2 +- 7 files changed, 102 insertions(+), 19 deletions(-) diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 83cf7d952d..688b7185e7 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client) await Remote.StartAsync().ConfigureAwait(false); Logger.LogInformation("Starting"); - MemberList = new MemberList(this); + MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); ClusterContext = Config.ClusterContextProducer(this); diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 87acf8e922..4e328fd836 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -60,12 +60,14 @@ public record MemberList private TaskCompletionSource _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private IConsensusHandle? _topologyConsensus; + private readonly bool _asClient; - public MemberList(Cluster cluster) + public MemberList(Cluster cluster, bool asClient = false) { _cluster = cluster; _system = _cluster.System; _root = _system.Root; + _asClient = asClient; var (host, port) = _cluster.System.GetAddress(); Self = new Member @@ -99,9 +101,10 @@ public MemberList(Cluster cluster) { SelfBlocked(); } - + //only log if the member is known to us - if (TryGetMember(b.MemberId, out _)) { + if (TryGetMember(b.MemberId, out _)) + { Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason); } @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection members) if (!_startedTcs.Task.IsCompleted) { - if (activeMembers.Contains(_system.Id)) + if (_asClient || activeMembers.Contains(_system.Id)) { _startedTcs.TrySetResult(true); } @@ -282,23 +285,23 @@ void MemberJoin(Member newMember) Logger.LogError("Member {Member} already exists in MemberList", newMember); return; } - + var index = _nextMemberIndex++; _metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index)); _membersByIndex = _membersByIndex.SetItem(index, newMember); _indexByAddress = _indexByAddress.SetItem(newMember.Address, index); - + foreach (var kind in newMember.Kinds) { if (!_memberStrategyByKind.ContainsKey(kind)) { _memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind)); } - + _memberStrategyByKind[kind].AddMember(newMember); } } - catch(Exception x) + catch (Exception x) { Logger.LogError(x, "Error during MemberJoin {Member}", newMember); } @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => /// public Member[] GetMembersByKind(string kind) => _activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray(); -} +} \ No newline at end of file diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs index d8ca8207f1..5cb0bc0d66 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs @@ -209,7 +209,7 @@ CancellationToken cancellationToken { _output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member"); - _ = clusterFixture.SpawnNode() + _ = clusterFixture.SpawnMember() .ContinueWith( t => { diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index d4c0ef7141..6f8ff65dbe 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -37,7 +37,8 @@ public interface IClusterFixture LogStore LogStore { get; } int ClusterSize { get; } - public Task SpawnNode(); + public Task SpawnMember(); + public Task SpawnClient(); Task RemoveNode(Cluster member, bool graceful = true); @@ -60,6 +61,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly List _members = new(); + private readonly List _clients = new(); private static TracerProvider? _tracerProvider; private GithubActionsReporter _reporter; @@ -178,6 +180,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true) Members.Remove(member); await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); } + else if (Clients.Contains(member)) + { + Clients.Remove(member); + await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); + } else { throw new ArgumentException("No such member"); @@ -194,15 +201,29 @@ public Task Trace(Func test, [CallerMemberName] string testName = "") /// /// /// - public async Task SpawnNode() + public async Task SpawnMember() { var newMember = await SpawnClusterMember(_configure); - Members.Add(newMember); + _members.Add(newMember); + + return newMember; + } + + /// + /// Spawns a node, adds it to the cluster and member list + /// + /// + /// + public async Task SpawnClient() + { + var newMember = await SpawnClusterClient(_configure); + _clients.Add(newMember); return newMember; } public IList Members => _members; + public IList Clients => _clients; private static void InitOpenTelemetryTracing() { @@ -303,11 +324,43 @@ protected virtual async Task SpawnClusterMember(Func SpawnClusterClient(Func? configure) + { + var config = ClusterConfig.Setup( + ClusterName, + GetClusterProvider(), + GetIdentityLookup(ClusterName) + ) + .WithHeartbeatExpiration(TimeSpan.Zero); + + config = configure?.Invoke(config) ?? config; + + var system = new ActorSystem(GetActorSystemConfig()); + system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id)); + + var logger = system.Logger()?.BeginScope(); + + system.EventStream.Subscribe(e => + { + logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); + } + ); + + var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); + var _ = new GrpcNetRemote(system, remoteConfig); + + var cluster = new Cluster(system, config); + + await cluster.StartClientAsync(); + + return cluster; + } protected virtual ActorSystemConfig GetActorSystemConfig() { diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index f640f25e20..41241c3ea0 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -52,6 +52,33 @@ await Trace(async () => }, _testOutputHelper); } + [Fact] + public async Task ClientsCanCallCluster() + { + await Trace(async () => + { + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + + var clientNode = await ClusterFixture.SpawnClient(); + + try + { + clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); + + var timer = Stopwatch.StartNew(); + await PingPong(clientNode, "client-unicorn", timeout); + timer.Stop(); + _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); + } + catch + { + await ClusterFixture.RemoveNode(clientNode); + throw; + } + + }, _testOutputHelper); + } + [Fact] public async Task TopologiesShouldHaveConsensus() { @@ -217,7 +244,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); await ClusterFixture.RemoveNode(toBeRemoved); _testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -256,7 +283,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Terminating node"); await ClusterFixture.RemoveNode(victim); _testOutputHelper.WriteLine("Spawning node"); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await Task.Delay(1000); cts.Cancel(); await worker; diff --git a/tests/Proto.Cluster.Tests/GossipTests.cs b/tests/Proto.Cluster.Tests/GossipTests.cs index 6dbb495b1b..33bffb7f04 100644 --- a/tests/Proto.Cluster.Tests/GossipTests.cs +++ b/tests/Proto.Cluster.Tests/GossipTests.cs @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks() afterSettingMatchingState.value.Should().Be(initialTopologyHash); - await clusterFixture.SpawnNode(); + await clusterFixture.SpawnMember(); await Task.Delay(2000); // Allow topology state to propagate var afterChangingTopology = diff --git a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs index 6760d7e942..fb9f47dd57 100644 --- a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs +++ b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType await member.RequestAsync(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1)); // pretend we have an invalid PID in the cache - var otherMember = await fixture.SpawnNode(); + var otherMember = await fixture.SpawnMember(); if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid)) { var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);