diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 83cf7d952d..688b7185e7 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client) await Remote.StartAsync().ConfigureAwait(false); Logger.LogInformation("Starting"); - MemberList = new MemberList(this); + MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); ClusterContext = Config.ClusterContextProducer(this); diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 87acf8e922..4e328fd836 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -60,12 +60,14 @@ public record MemberList private TaskCompletionSource _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private IConsensusHandle? _topologyConsensus; + private readonly bool _asClient; - public MemberList(Cluster cluster) + public MemberList(Cluster cluster, bool asClient = false) { _cluster = cluster; _system = _cluster.System; _root = _system.Root; + _asClient = asClient; var (host, port) = _cluster.System.GetAddress(); Self = new Member @@ -99,9 +101,10 @@ public MemberList(Cluster cluster) { SelfBlocked(); } - + //only log if the member is known to us - if (TryGetMember(b.MemberId, out _)) { + if (TryGetMember(b.MemberId, out _)) + { Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason); } @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection members) if (!_startedTcs.Task.IsCompleted) { - if (activeMembers.Contains(_system.Id)) + if (_asClient || activeMembers.Contains(_system.Id)) { _startedTcs.TrySetResult(true); } @@ -282,23 +285,23 @@ void MemberJoin(Member newMember) Logger.LogError("Member {Member} already exists in MemberList", newMember); return; } - + var index = _nextMemberIndex++; _metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index)); _membersByIndex = _membersByIndex.SetItem(index, newMember); _indexByAddress = _indexByAddress.SetItem(newMember.Address, index); - + foreach (var kind in newMember.Kinds) { if (!_memberStrategyByKind.ContainsKey(kind)) { _memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind)); } - + _memberStrategyByKind[kind].AddMember(newMember); } } - catch(Exception x) + catch (Exception x) { Logger.LogError(x, "Error during MemberJoin {Member}", newMember); } @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => /// public Member[] GetMembersByKind(string kind) => _activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray(); -} +} \ No newline at end of file diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs index d8ca8207f1..5cb0bc0d66 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/PartitionIdentityTests.cs @@ -209,7 +209,7 @@ CancellationToken cancellationToken { _output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member"); - _ = clusterFixture.SpawnNode() + _ = clusterFixture.SpawnMember() .ContinueWith( t => { diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index d4c0ef7141..05b184a516 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -34,10 +34,13 @@ public interface IClusterFixture { IList Members { get; } + bool SupportsClients { get; } + LogStore LogStore { get; } int ClusterSize { get; } - public Task SpawnNode(); + public Task SpawnMember(); + public Task SpawnClient(); Task RemoveNode(Cluster member, bool graceful = true); @@ -55,11 +58,12 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi { private static readonly object Lock = new(); - + public const string InvalidIdentity = "invalid"; private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly List _members = new(); + private readonly List _clients = new(); private static TracerProvider? _tracerProvider; private GithubActionsReporter _reporter; @@ -69,14 +73,14 @@ static ClusterFixture() { TracingSettings.OpenTelemetryUrl = Environment.GetEnvironmentVariable("OPENTELEMETRY_URL"); TracingSettings.TraceViewUrl = Environment.GetEnvironmentVariable("TRACEVIEW_URL"); - // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; + // TracingSettings.OpenTelemetryUrl = "http://Localhost:4317"; TracingSettings.EnableTracing = TracingSettings.OpenTelemetryUrl != null; //TODO: check if this helps low resource envs like github actions. ThreadPool.SetMinThreads(40, 40); } - protected ClusterFixture( int clusterSize, Func? configure = null) + protected ClusterFixture(int clusterSize, Func? configure = null) { _reporter = new GithubActionsReporter(GetType().Name); ClusterSize = clusterSize; @@ -87,11 +91,14 @@ protected ClusterFixture( int clusterSize, Func? c // ReSharper disable once HeuristicUnreachableCode if (TracingSettings.EnableTracing) { - InitOpenTelemetryTracing(); + InitOpenTelemetryTracing(); } #pragma warning restore CS0162 } + public virtual bool SupportsClients => true; + + protected virtual ClusterKind[] ClusterKinds => new[] { new ClusterKind(EchoActor.Kind, EchoActor.Props.WithClusterRequestDeduplication()), @@ -122,7 +129,7 @@ public async Task DisposeAsync() try { _tracerProvider?.ForceFlush(); - + await _reporter.WriteReportFile(); await OnDisposing(); @@ -154,11 +161,10 @@ private async Task WaitForMembersToShutdown() _logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id); var done = await task.WaitUpTo(TimeSpan.FromSeconds(5)); - if (! done) + if (!done) { _logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id); } - } catch (Exception e) { @@ -178,6 +184,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true) Members.Remove(member); await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); } + else if (Clients.Contains(member)) + { + Clients.Remove(member); + await member.ShutdownAsync(graceful, "Stopped by ClusterFixture"); + } else { throw new ArgumentException("No such member"); @@ -194,15 +205,29 @@ public Task Trace(Func test, [CallerMemberName] string testName = "") /// /// /// - public async Task SpawnNode() + public async Task SpawnMember() { var newMember = await SpawnClusterMember(_configure); - Members.Add(newMember); + _members.Add(newMember); + + return newMember; + } + + /// + /// Spawns a node, adds it to the cluster and member list + /// + /// + /// + public async Task SpawnClient() + { + var newMember = await SpawnClusterClient(_configure); + _clients.Add(newMember); return newMember; } public IList Members => _members; + public IList Clients => _clients; private static void InitOpenTelemetryTracing() { @@ -224,7 +249,7 @@ private static void InitOpenTelemetryTracing() { options .SetResourceBuilder(builder) - .AddOtlpExporter( (OtlpExporterOptions o) => + .AddOtlpExporter((OtlpExporterOptions o) => { o.Endpoint = endpoint; o.ExportProcessorType = ExportProcessorType.Batch; @@ -262,10 +287,10 @@ private async Task> SpawnClusterNodes( { var tasks = Enumerable.Range(0, count) .Select(_ => SpawnClusterMember(configure)); - + var res = (await Task.WhenAll(tasks)).ToList(); - - + + var consensus = res.Select(m => m.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10))); var x = await Task.WhenAll(consensus); if (x.Any(c => !c.consensus)) @@ -293,10 +318,7 @@ protected virtual async Task SpawnClusterMember(Func(); - system.EventStream.Subscribe(e => - { - logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); - } + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } ); var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); @@ -309,6 +331,35 @@ protected virtual async Task SpawnClusterMember(Func SpawnClusterClient(Func? configure) + { + var config = ClusterConfig.Setup( + ClusterName, + GetClusterProvider(), + GetIdentityLookup(ClusterName) + ) + .WithHeartbeatExpiration(TimeSpan.Zero); + + config = configure?.Invoke(config) ?? config; + + var system = new ActorSystem(GetActorSystemConfig()); + system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id)); + + var logger = system.Logger()?.BeginScope(); + + system.EventStream.Subscribe(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); } + ); + + var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor); + var _ = new GrpcNetRemote(system, remoteConfig); + + var cluster = new Cluster(system, config); + + await cluster.StartClientAsync(); + + return cluster; + } + protected virtual ActorSystemConfig GetActorSystemConfig() { var actorSystemConfig = ActorSystemConfig.Setup(); @@ -317,7 +368,7 @@ protected virtual ActorSystemConfig GetActorSystemConfig() return TracingSettings.EnableTracing ? actorSystemConfig .WithConfigureProps(props => props.WithTracing().WithLoggingContextDecorator(_logger).WithLoggingContextDecorator(_logger)) - .WithConfigureSystemProps((name,props) => + .WithConfigureSystemProps((name, props) => { // if (name == "$gossip") // return props; @@ -427,4 +478,6 @@ public SingleNodeProviderFixture() : base(1, config => config.WithActorRequestTi protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider(); protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup(); + + public override bool SupportsClients => false; } \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index f640f25e20..76d1cb9032 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -28,7 +28,6 @@ protected ClusterTests(ITestOutputHelper testOutputHelper, IClusterFixture clust [Fact] public void ClusterMembersMatch() { - var memberSet = Members.First().MemberList.GetMembers(); memberSet.Should().NotBeEmpty(); @@ -51,16 +50,46 @@ await Trace(async () => _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); }, _testOutputHelper); } - + + [Fact] + public async Task ClientsCanCallCluster() + { + if (!ClusterFixture.SupportsClients) + return; + + await Trace(async () => + { + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + + var clientNode = await ClusterFixture.SpawnClient(); + + try + { + await clientNode.JoinedCluster.WaitAsync(timeout); + clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue(); + + var timer = Stopwatch.StartNew(); + await PingPong(clientNode, "client-unicorn", timeout); + timer.Stop(); + _testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}"); + } + catch + { + await ClusterFixture.RemoveNode(clientNode); + throw; + } + }, _testOutputHelper); + } + [Fact] public async Task TopologiesShouldHaveConsensus() { await Trace(async () => { var consensus = await Task - .WhenAll(Members.Select(member => - member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) - .WaitUpTo(TimeSpan.FromSeconds(20)) + .WhenAll(Members.Select(member => + member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20)))) + .WaitUpTo(TimeSpan.FromSeconds(20)) ; _testOutputHelper.WriteLine(await Members.DumpClusterState()); @@ -217,7 +246,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); await ClusterFixture.RemoveNode(toBeRemoved); _testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -256,7 +285,7 @@ await Trace(async () => _testOutputHelper.WriteLine("Terminating node"); await ClusterFixture.RemoveNode(victim); _testOutputHelper.WriteLine("Spawning node"); - await ClusterFixture.SpawnNode(); + await ClusterFixture.SpawnMember(); await Task.Delay(1000); cts.Cancel(); await worker; @@ -468,7 +497,7 @@ private async Task PingPong( string id, CancellationToken token = default, string kind = EchoActor.Kind, - ISenderContext context= null + ISenderContext context = null ) { await Task.Yield(); diff --git a/tests/Proto.Cluster.Tests/GossipTests.cs b/tests/Proto.Cluster.Tests/GossipTests.cs index 6dbb495b1b..33bffb7f04 100644 --- a/tests/Proto.Cluster.Tests/GossipTests.cs +++ b/tests/Proto.Cluster.Tests/GossipTests.cs @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks() afterSettingMatchingState.value.Should().Be(initialTopologyHash); - await clusterFixture.SpawnNode(); + await clusterFixture.SpawnMember(); await Task.Delay(2000); // Allow topology state to propagate var afterChangingTopology = diff --git a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs index 6760d7e942..fb9f47dd57 100644 --- a/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs +++ b/tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType await member.RequestAsync(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1)); // pretend we have an invalid PID in the cache - var otherMember = await fixture.SpawnNode(); + var otherMember = await fixture.SpawnMember(); if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid)) { var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);