Skip to content

Commit

Permalink
Fixed issue with client cluster logic where it would not see itself a…
Browse files Browse the repository at this point in the history
…s started unless listed by the cluster provider. Changed member list to accept any cluster topology updates when in client mode and set started.

Added tests for client mode requests.
  • Loading branch information
mhelleborg committed Nov 15, 2023
1 parent b06ae68 commit 2e0acd1
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private async Task BeginStartAsync(bool client)
await Remote.StartAsync().ConfigureAwait(false);

Logger.LogInformation("Starting");
MemberList = new MemberList(this);
MemberList = new MemberList(this, client);
_ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true));
ClusterContext = Config.ClusterContextProducer(this);

Expand Down
21 changes: 12 additions & 9 deletions src/Proto.Cluster/Member/MemberList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public record MemberList

private TaskCompletionSource<bool> _startedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private IConsensusHandle<ulong>? _topologyConsensus;
private readonly bool _asClient;

public MemberList(Cluster cluster)
public MemberList(Cluster cluster, bool asClient = false)
{
_cluster = cluster;
_system = _cluster.System;
_root = _system.Root;
_asClient = asClient;
var (host, port) = _cluster.System.GetAddress();

Self = new Member
Expand Down Expand Up @@ -99,9 +101,10 @@ public MemberList(Cluster cluster)
{
SelfBlocked();
}

//only log if the member is known to us
if (TryGetMember(b.MemberId, out _)) {
if (TryGetMember(b.MemberId, out _))
{
Logger.LogInformation("Blocking member {MemberId} due to {Reason}", b.MemberId, b.Reason);
}

Expand Down Expand Up @@ -236,7 +239,7 @@ public void UpdateClusterTopology(IReadOnlyCollection<Member> members)

if (!_startedTcs.Task.IsCompleted)
{
if (activeMembers.Contains(_system.Id))
if (_asClient || activeMembers.Contains(_system.Id))
{
_startedTcs.TrySetResult(true);
}
Expand Down Expand Up @@ -282,23 +285,23 @@ void MemberJoin(Member newMember)
Logger.LogError("Member {Member} already exists in MemberList", newMember);
return;
}

var index = _nextMemberIndex++;
_metaMembers = _metaMembers.SetItem(newMember.Id, new MetaMember(newMember, index));
_membersByIndex = _membersByIndex.SetItem(index, newMember);
_indexByAddress = _indexByAddress.SetItem(newMember.Address, index);

foreach (var kind in newMember.Kinds)
{
if (!_memberStrategyByKind.ContainsKey(kind))
{
_memberStrategyByKind = _memberStrategyByKind.SetItem(kind, GetMemberStrategyByKind(kind));
}

_memberStrategyByKind[kind].AddMember(newMember);
}
}
catch(Exception x)
catch (Exception x)
{
Logger.LogError(x, "Error during MemberJoin {Member}", newMember);
}
Expand Down Expand Up @@ -442,4 +445,4 @@ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) =>
/// <returns></returns>
public Member[] GetMembersByKind(string kind) =>
_activeMembers.Members.Where(m => m.Kinds.Contains(kind)).ToArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ CancellationToken cancellationToken
{
_output.WriteLine($"[{DateTimeOffset.Now:O}] Starting cluster member");

_ = clusterFixture.SpawnNode()
_ = clusterFixture.SpawnMember()
.ContinueWith(
t =>
{
Expand Down
61 changes: 57 additions & 4 deletions tests/Proto.Cluster.Tests/ClusterFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public interface IClusterFixture
LogStore LogStore { get; }
int ClusterSize { get; }

public Task<Cluster> SpawnNode();
public Task<Cluster> SpawnMember();
public Task<Cluster> SpawnClient();

Task RemoveNode(Cluster member, bool graceful = true);

Expand All @@ -60,6 +61,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi
private readonly Func<ClusterConfig, ClusterConfig>? _configure;
private readonly ILogger _logger = Log.CreateLogger(nameof(GetType));
private readonly List<Cluster> _members = new();
private readonly List<Cluster> _clients = new();
private static TracerProvider? _tracerProvider;
private GithubActionsReporter _reporter;

Expand Down Expand Up @@ -178,6 +180,11 @@ public async Task RemoveNode(Cluster member, bool graceful = true)
Members.Remove(member);
await member.ShutdownAsync(graceful, "Stopped by ClusterFixture");
}
else if (Clients.Contains(member))
{
Clients.Remove(member);
await member.ShutdownAsync(graceful, "Stopped by ClusterFixture");
}
else
{
throw new ArgumentException("No such member");
Expand All @@ -194,15 +201,29 @@ public Task Trace(Func<Task> test, [CallerMemberName] string testName = "")
/// </summary>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public async Task<Cluster> SpawnNode()
public async Task<Cluster> SpawnMember()
{
var newMember = await SpawnClusterMember(_configure);
Members.Add(newMember);
_members.Add(newMember);

return newMember;
}

/// <summary>
/// Spawns a node, adds it to the cluster and member list
/// </summary>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public async Task<Cluster> SpawnClient()
{
var newMember = await SpawnClusterClient(_configure);
_clients.Add(newMember);

return newMember;
}

public IList<Cluster> Members => _members;
public IList<Cluster> Clients => _clients;

private static void InitOpenTelemetryTracing()
{
Expand Down Expand Up @@ -303,11 +324,43 @@ protected virtual async Task<Cluster> SpawnClusterMember(Func<ClusterConfig, Clu
var _ = new GrpcNetRemote(system, remoteConfig);

var cluster = new Cluster(system, config);

await cluster.StartMemberAsync();

return cluster;
}

protected virtual async Task<Cluster> SpawnClusterClient(Func<ClusterConfig, ClusterConfig>? configure)
{
var config = ClusterConfig.Setup(
ClusterName,
GetClusterProvider(),
GetIdentityLookup(ClusterName)
)
.WithHeartbeatExpiration(TimeSpan.Zero);

config = configure?.Invoke(config) ?? config;

var system = new ActorSystem(GetActorSystemConfig());
system.Extensions.Register(new InstanceLogger(LogLevel.Debug, LogStore, category: system.Id));

var logger = system.Logger()?.BeginScope<EventStream>();

system.EventStream.Subscribe<object>(e =>
{
logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e);
}
);

var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor);
var _ = new GrpcNetRemote(system, remoteConfig);

var cluster = new Cluster(system, config);

await cluster.StartClientAsync();

return cluster;
}

protected virtual ActorSystemConfig GetActorSystemConfig()
{
Expand Down
31 changes: 29 additions & 2 deletions tests/Proto.Cluster.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ await Trace(async () =>
}, _testOutputHelper);
}

[Fact]
public async Task ClientsCanCallCluster()
{
await Trace(async () =>
{
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;

var clientNode = await ClusterFixture.SpawnClient();

try
{
clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue();

var timer = Stopwatch.StartNew();
await PingPong(clientNode, "client-unicorn", timeout);
timer.Stop();
_testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}");
}
catch
{
await ClusterFixture.RemoveNode(clientNode);
throw;

Check failure on line 76 in tests/Proto.Cluster.Tests/ClusterTests.cs

View workflow job for this annotation

GitHub Actions / test-slow (net7.0, tests/Proto.Cluster.RedisIdentity.Tests/*.csproj)

Proto.Cluster.RedisIdentity.Tests.RedisIdentityClusterFixture+RedisClusterTests.ClientsCanCallCluster

Expected clientNode.JoinedCluster.IsCompletedSuccessfully to be true, but found False.

Check failure on line 76 in tests/Proto.Cluster.Tests/ClusterTests.cs

View workflow job for this annotation

GitHub Actions / test-slow (net7.0, tests/Proto.Cluster.RedisIdentity.Tests/*.csproj)

Proto.Cluster.RedisIdentity.Tests.ChaosMonkeyRedisIdentityClusterFixture+ResilienceRedisClusterTests.ClientsCanCallCluster

Expected clientNode.JoinedCluster.IsCompletedSuccessfully to be true, but found False.

Check failure on line 76 in tests/Proto.Cluster.Tests/ClusterTests.cs

View workflow job for this annotation

GitHub Actions / test-slow (net6.0, tests/Proto.Cluster.RedisIdentity.Tests/*.csproj)

Proto.Cluster.RedisIdentity.Tests.ChaosMonkeyRedisIdentityClusterFixture+ResilienceRedisClusterTests.ClientsCanCallCluster

Expected clientNode.JoinedCluster.IsCompletedSuccessfully to be true, but found False.
}

}, _testOutputHelper);
}

[Fact]
public async Task TopologiesShouldHaveConsensus()
{
Expand Down Expand Up @@ -217,7 +244,7 @@ await Trace(async () =>
_testOutputHelper.WriteLine("Removing node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address);
await ClusterFixture.RemoveNode(toBeRemoved);
_testOutputHelper.WriteLine("Removed node " + toBeRemoved.System.Id + " / " + toBeRemoved.System.Address);
await ClusterFixture.SpawnNode();
await ClusterFixture.SpawnMember();

await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000);

Expand Down Expand Up @@ -256,7 +283,7 @@ await Trace(async () =>
_testOutputHelper.WriteLine("Terminating node");
await ClusterFixture.RemoveNode(victim);
_testOutputHelper.WriteLine("Spawning node");
await ClusterFixture.SpawnNode();
await ClusterFixture.SpawnMember();
await Task.Delay(1000);
cts.Cancel();
await worker;
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Cluster.Tests/GossipTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task CompositeConsensusWorks()

afterSettingMatchingState.value.Should().Be(initialTopologyHash);

await clusterFixture.SpawnNode();
await clusterFixture.SpawnMember();
await Task.Delay(2000); // Allow topology state to propagate

var afterChangingTopology =
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType
await member.RequestAsync<Pong>(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1));

// pretend we have an invalid PID in the cache
var otherMember = await fixture.SpawnNode();
var otherMember = await fixture.SpawnMember();
if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid))
{
var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);
Expand Down

0 comments on commit 2e0acd1

Please sign in to comment.