Skip to content

Commit

Permalink
Don't test for client functionality when unsupported by provider
Browse files Browse the repository at this point in the history
  • Loading branch information
mhelleborg committed Nov 15, 2023
1 parent c504efb commit 6b6b758
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 34 deletions.
48 changes: 24 additions & 24 deletions tests/Proto.Cluster.Tests/ClusterFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface IClusterFixture
{
IList<Cluster> Members { get; }

bool SupportsClients { get; }

LogStore LogStore { get; }
int ClusterSize { get; }

Expand All @@ -56,7 +58,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi
{
private static readonly object Lock = new();


public const string InvalidIdentity = "invalid";
private readonly Func<ClusterConfig, ClusterConfig>? _configure;
private readonly ILogger _logger = Log.CreateLogger(nameof(GetType));
Expand All @@ -71,14 +73,14 @@ static ClusterFixture()
{
TracingSettings.OpenTelemetryUrl = Environment.GetEnvironmentVariable("OPENTELEMETRY_URL");
TracingSettings.TraceViewUrl = Environment.GetEnvironmentVariable("TRACEVIEW_URL");
// TracingSettings.OpenTelemetryUrl = "http://Localhost:4317";
// TracingSettings.OpenTelemetryUrl = "http://Localhost:4317";
TracingSettings.EnableTracing = TracingSettings.OpenTelemetryUrl != null;

//TODO: check if this helps low resource envs like github actions.
ThreadPool.SetMinThreads(40, 40);
}

protected ClusterFixture( int clusterSize, Func<ClusterConfig, ClusterConfig>? configure = null)
protected ClusterFixture(int clusterSize, Func<ClusterConfig, ClusterConfig>? configure = null)
{
_reporter = new GithubActionsReporter(GetType().Name);
ClusterSize = clusterSize;
Expand All @@ -89,11 +91,14 @@ protected ClusterFixture( int clusterSize, Func<ClusterConfig, ClusterConfig>? c
// ReSharper disable once HeuristicUnreachableCode
if (TracingSettings.EnableTracing)
{
InitOpenTelemetryTracing();
InitOpenTelemetryTracing();
}
#pragma warning restore CS0162
}

public virtual bool SupportsClients => true;


protected virtual ClusterKind[] ClusterKinds => new[]
{
new ClusterKind(EchoActor.Kind, EchoActor.Props.WithClusterRequestDeduplication()),
Expand Down Expand Up @@ -124,7 +129,7 @@ public async Task DisposeAsync()
try
{
_tracerProvider?.ForceFlush();

await _reporter.WriteReportFile();

await OnDisposing();
Expand Down Expand Up @@ -156,11 +161,10 @@ private async Task WaitForMembersToShutdown()
_logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id);

var done = await task.WaitUpTo(TimeSpan.FromSeconds(5));
if (! done)
if (!done)
{
_logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id);
}

}
catch (Exception e)
{
Expand Down Expand Up @@ -208,7 +212,7 @@ public async Task<Cluster> SpawnMember()

return newMember;
}

/// <summary>
/// Spawns a node, adds it to the cluster and member list
/// </summary>
Expand Down Expand Up @@ -245,7 +249,7 @@ private static void InitOpenTelemetryTracing()
{
options
.SetResourceBuilder(builder)
.AddOtlpExporter( (OtlpExporterOptions o) =>
.AddOtlpExporter((OtlpExporterOptions o) =>
{
o.Endpoint = endpoint;
o.ExportProcessorType = ExportProcessorType.Batch;
Expand Down Expand Up @@ -283,10 +287,10 @@ private async Task<IList<Cluster>> SpawnClusterNodes(
{
var tasks = Enumerable.Range(0, count)
.Select(_ => SpawnClusterMember(configure));

var res = (await Task.WhenAll(tasks)).ToList();


var consensus = res.Select(m => m.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10)));
var x = await Task.WhenAll(consensus);
if (x.Any(c => !c.consensus))
Expand Down Expand Up @@ -314,22 +318,19 @@ protected virtual async Task<Cluster> SpawnClusterMember(Func<ClusterConfig, Clu

var logger = system.Logger()?.BeginScope<EventStream>();

system.EventStream.Subscribe<object>(e =>
{
logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e);
}
system.EventStream.Subscribe<object>(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); }
);

var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor);
var _ = new GrpcNetRemote(system, remoteConfig);

var cluster = new Cluster(system, config);

await cluster.StartMemberAsync();

return cluster;
}

protected virtual async Task<Cluster> SpawnClusterClient(Func<ClusterConfig, ClusterConfig>? configure)
{
var config = ClusterConfig.Setup(
Expand All @@ -346,17 +347,14 @@ protected virtual async Task<Cluster> SpawnClusterClient(Func<ClusterConfig, Clu

var logger = system.Logger()?.BeginScope<EventStream>();

system.EventStream.Subscribe<object>(e =>
{
logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e);
}
system.EventStream.Subscribe<object>(e => { logger?.LogDebug("EventStream {MessageType}:{MessagePayload}", e.GetType().Name, e); }
);

var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(MessagesReflection.Descriptor);
var _ = new GrpcNetRemote(system, remoteConfig);

var cluster = new Cluster(system, config);

await cluster.StartClientAsync();

return cluster;
Expand All @@ -370,7 +368,7 @@ protected virtual ActorSystemConfig GetActorSystemConfig()
return TracingSettings.EnableTracing
? actorSystemConfig
.WithConfigureProps(props => props.WithTracing().WithLoggingContextDecorator(_logger).WithLoggingContextDecorator(_logger))
.WithConfigureSystemProps((name,props) =>
.WithConfigureSystemProps((name, props) =>
{
// if (name == "$gossip")
// return props;
Expand Down Expand Up @@ -480,4 +478,6 @@ public SingleNodeProviderFixture() : base(1, config => config.WithActorRequestTi
protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider();

protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup();

public override bool SupportsClients => false;
}
21 changes: 11 additions & 10 deletions tests/Proto.Cluster.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ protected ClusterTests(ITestOutputHelper testOutputHelper, IClusterFixture clust
[Fact]
public void ClusterMembersMatch()
{

var memberSet = Members.First().MemberList.GetMembers();

memberSet.Should().NotBeEmpty();
Expand All @@ -51,21 +50,24 @@ await Trace(async () =>
_testOutputHelper.WriteLine($"Spawned 1 actor in {timer.Elapsed}");
}, _testOutputHelper);
}

[Fact]
public async Task ClientsCanCallCluster()
{
if (!ClusterFixture.SupportsClients)
return;

await Trace(async () =>
{
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;

var clientNode = await ClusterFixture.SpawnClient();
var clientNode = await ClusterFixture.SpawnClient();

try
{
await clientNode.JoinedCluster.WaitAsync(timeout);
clientNode.JoinedCluster.IsCompletedSuccessfully.Should().BeTrue();

var timer = Stopwatch.StartNew();
await PingPong(clientNode, "client-unicorn", timeout);
timer.Stop();
Expand All @@ -76,19 +78,18 @@ await Trace(async () =>
await ClusterFixture.RemoveNode(clientNode);
throw;
}

}, _testOutputHelper);
}

[Fact]
public async Task TopologiesShouldHaveConsensus()
{
await Trace(async () =>
{
var consensus = await Task
.WhenAll(Members.Select(member =>
member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20))))
.WaitUpTo(TimeSpan.FromSeconds(20))
.WhenAll(Members.Select(member =>
member.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(20))))
.WaitUpTo(TimeSpan.FromSeconds(20))
;

_testOutputHelper.WriteLine(await Members.DumpClusterState());
Expand Down Expand Up @@ -496,7 +497,7 @@ private async Task PingPong(
string id,
CancellationToken token = default,
string kind = EchoActor.Kind,
ISenderContext context= null
ISenderContext context = null
)
{
await Task.Yield();
Expand Down

0 comments on commit 6b6b758

Please sign in to comment.