diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 83cf7d952d..688b7185e7 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client) await Remote.StartAsync().ConfigureAwait(false); Logger.LogInformation("Starting"); - MemberList = new MemberList(this); + MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); ClusterContext = Config.ClusterContextProducer(this); diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 87acf8e922..4e328fd836 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -60,12 +60,14 @@ public record MemberList private TaskCompletionSource _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private IConsensusHandle? _topologyConsensus; + private readonly bool _asClient; - public MemberList(Cluster cluster) + public MemberList(Cluster cluster, bool asClient = false) { _cluster = cluster; _system = _cluster.System; _root = _system.Root; + _asClient = asClient; var (host, port) = _cluster.System.GetAddress(); Self = new Member @@ -99,9 +101,10 @@ public MemberList(Cluster cluster) { SelfBlocked(); } - + //only log if the member is known to us - if (TryGetMember(b.MemberId, out _)) { + if (TryGetMember(b.MemberId, out _)) + { Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason); } @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection members) if (!_startedTcs.Task.IsCompleted) { - if (activeMembers.Contains(_system.Id)) + if (_asClient || activeMembers.Contains(_system.Id)) { _startedTcs.TrySetResult(true); } @@ -282,23 +285,23 @@ void MemberJoin(Member newMember) Logger.LogError("Member {Member} already exists in MemberList", newMember); return; } - + var index = _nextMemberIndex++; _metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index)); _membersByIndex = _membersByIndex.SetItem(index, newMember); _indexByAddress = _indexByAddress.SetItem(newMember.Address, index); - + foreach (var kind in newMember.Kinds) { if (!_memberStrategyByKind.ContainsKey(kind)) { _memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind)); } - + _memberStrategyByKind[kind].AddMember(newMember); } } - catch(Exception x) + catch (Exception x) { Logger.LogError(x, "Error during MemberJoin {Member}", newMember); } @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => /// public Member[] GetMembersByKind(string kind) => _activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray(); -} +} \ No newline at end of file diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs index d8ca8207f1..5cb0bc0d66 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs @@ -209,7 +209,7 @@ CancellationToken cancellationToken { _output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member"); - _ = clusterFixture.SpawnNode() + _ = clusterFixture.SpawnMember() .ContinueWith( t => { diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index d4c0ef7141..6f8ff65dbe 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -37,7 +37,8 @@ public interface IClusterFixture LogStore LogStore { get; } int ClusterSize { get; } - public Task SpawnNode(); + public Task SpawnMember(); + public Task SpawnClient(); Task RemoveNode(Cluster member, bool graceful = true); @@ -60,6 +61,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly List _members = new(); + private readonly List _clients = new(); private static TracerProvider? _tracerProvider; private GithubActionsReporter _reporter; @@ -178,6 +180,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true) Members.Remove(member); await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); } + else if (Clients.Contains(member)) + { + Clients.Remove(member); + await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); + } else { throw new ArgumentException("No such member"); @@ -194,15 +201,29 @@ public Task Trace(Func test, [CallerMemberName] string testName = "") /// /// /// - public async Task SpawnNode() + public async Task SpawnMember() { var newMember = await SpawnClusterMember(_configure); - Members.Add(newMember); + _members.Add(newMember); + + return newMember; + } + + /// + /// Spawns a node, adds it to the cluster and member list + /// + /// + /// + public async Task SpawnClient() + { + var newMember = await SpawnClusterClient(_configure); + _clients.Add(newMember); return newMember; } public IList Members => _members; + public IList Clients => _clients; private static void InitOpenTelemetryTracing() { @@ -303,11 +324,43 @@ protected virtual async Task SpawnClusterMember(Func SpawnClusterClient(Func? configure) + { + var config = ClusterConfig.Setup( + ClusterName, + GetClusterProvider(), + GetIdentityLookup(ClusterName) + ) + .WithHeartbeatExpiration(TimeSpan.Zero); + + config = configure?.Invoke(config) ?? config; + + var system = new ActorSystem(GetActorSystemConfig()); + system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id)); + + var logger = system.Logger()?.BeginScope(); + + system.EventStream.Subscribe(e => + { + logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); + } + ); + + var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); + var _ = new GrpcNetRemote(system, remoteConfig); + + var cluster = new Cluster(system, config); + + await cluster.StartClientAsync(); + + return cluster; + } protected virtual ActorSystemConfig GetActorSystemConfig() { diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index f640f25e20..41241c3ea0 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -52,6 +52,33 @@ await Trace(async () => }, _testOutputHelper); } + [Fact] + public async Task ClientsCanCallCluster() + { + await Trace(async () => + { + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + + var clientNode = await ClusterFixture.SpawnClient(); + + try + { + clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); + + var timer = Stopwatch.StartNew(); + await PingPong(clientNode, "client-unicorn", timeout); + timer.Stop(); + _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); + } + catch + { + await ClusterFixture.RemoveNode(clientNode); + throw; + } + + }, _testOutputHelper); + } + [Fact] public async Task TopologiesShouldHaveConsensus() { @@ -217,7 +244,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); await ClusterFixture.RemoveNode(toBeRemoved); _testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -256,7 +283,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Terminating node"); await ClusterFixture.RemoveNode(victim); _testOutputHelper.WriteLine("Spawning node"); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await Task.Delay(1000); cts.Cancel(); await worker; diff --git a/tests/Proto.Cluster.Tests/GossipTests.cs b/tests/Proto.Cluster.Tests/GossipTests.cs index 6dbb495b1b..33bffb7f04 100644 --- a/tests/Proto.Cluster.Tests/GossipTests.cs +++ b/tests/Proto.Cluster.Tests/GossipTests.cs @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks() afterSettingMatchingState.value.Should().Be(initialTopologyHash); - await clusterFixture.SpawnNode(); + await clusterFixture.SpawnMember(); await Task.Delay(2000); // Allow topology state to propagate var afterChangingTopology = diff --git a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs index 6760d7e942..fb9f47dd57 100644 --- a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs +++ b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType await member.RequestAsync(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1)); // pretend we have an invalid PID in the cache - var otherMember = await fixture.SpawnNode(); + var otherMember = await fixture.SpawnMember(); if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid)) { var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);