Skip to content

Commit

Permalink
formatting (#2042)
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing authored Sep 7, 2023
1 parent 682fc30 commit cec3bf2
Show file tree
Hide file tree
Showing 58 changed files with 997 additions and 702 deletions.
108 changes: 61 additions & 47 deletions benchmarks/AutoClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ IIdentityLookup identityLookup

private static GrpcNetRemoteConfig GetRemoteConfig()
{
var portStr = Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}";
var portStr =
Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}";
var port = int.Parse(portStr);
var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? RemoteConfigBase.Localhost;
var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC");

var remoteConfig = GrpcNetRemoteConfig
.BindTo(host, port)
.WithAdvertisedHost(advertisedHost)
.WithChannelOptions(new GrpcChannelOptions
.WithChannelOptions(
new GrpcChannelOptions
{
CompressionProviders = new[]
{
Expand All @@ -63,12 +65,13 @@ private static GrpcNetRemoteConfig GetRemoteConfig()

private static InMemAgent Agent = null!;

private static TestProviderOptions Options = new()
{
DeregisterCritical = TimeSpan.FromSeconds(10),
RefreshTtl = TimeSpan.FromSeconds(5),
ServiceTtl = TimeSpan.FromSeconds(3)
};
private static TestProviderOptions Options =
new()
{
DeregisterCritical = TimeSpan.FromSeconds(10),
RefreshTtl = TimeSpan.FromSeconds(5),
ServiceTtl = TimeSpan.FromSeconds(3)
};

public static void ResetAgent()
{
Expand All @@ -77,36 +80,40 @@ public static void ResetAgent()

private static IClusterProvider ClusterProvider() => new TestProvider(Options, Agent);

private static IIdentityLookup GetIdentityLookup() => new PartitionIdentityLookup(
new PartitionConfig
{
RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5),
GetPidTimeout = TimeSpan.FromSeconds(5),
RebalanceRequestTimeout = TimeSpan.FromSeconds(1),
Mode = PartitionIdentityLookup.Mode.Push,
}
);
private static IIdentityLookup GetIdentityLookup() =>
new PartitionIdentityLookup(
new PartitionConfig
{
RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5),
GetPidTimeout = TimeSpan.FromSeconds(5),
RebalanceRequestTimeout = TimeSpan.FromSeconds(1),
Mode = PartitionIdentityLookup.Mode.Push,
}
);

public static async Task<Cluster> SpawnMember()
{
var system = new ActorSystem(GetMemberActorSystemConfig()
);
system.EventStream.Subscribe<ClusterTopology>(e => {
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}");
Console.ResetColor();
}
);
system.EventStream.Subscribe<LeaderElected>(e => {
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
}
);
var system = new ActorSystem(GetMemberActorSystemConfig());
system.EventStream.Subscribe<ClusterTopology>(e =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(
$"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"
);
Console.ResetColor();
});
system.EventStream.Subscribe<LeaderElected>(e =>
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
});
var clusterProvider = ClusterProvider();
var identity = GetIdentityLookup();

system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity));
system
.WithRemote(GetRemoteConfig())
.WithCluster(GetClusterConfig(clusterProvider, identity));
await system.Cluster().StartMemberAsync();
return system.Cluster();
}
Expand All @@ -124,28 +131,35 @@ private static ActorSystemConfig GetMemberActorSystemConfig()

public static async Task<Cluster> SpawnClient()
{
var config = new ActorSystemConfig().WithDeadLetterThrottleCount(3)
var config = new ActorSystemConfig()
.WithDeadLetterThrottleCount(3)
.WithSharedFutures()
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
var system = new ActorSystem(EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config);
system.EventStream.Subscribe<ClusterTopology>(e => {
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}");
Console.ResetColor();
}
);
system.EventStream.Subscribe<LeaderElected>(e => {
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
}
var system = new ActorSystem(
EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config
);
system.EventStream.Subscribe<ClusterTopology>(e =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(
$"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"
);
Console.ResetColor();
});
system.EventStream.Subscribe<LeaderElected>(e =>
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
});
var clusterProvider = ClusterProvider();
var identity = GetIdentityLookup();
system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity));
system
.WithRemote(GetRemoteConfig())
.WithCluster(GetClusterConfig(clusterProvider, identity));

await system.Cluster().StartClientAsync();
return system.Cluster();
}
}
}
40 changes: 25 additions & 15 deletions benchmarks/AutoClusterBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static async Task Main(string[] args)
{
ThreadPool.SetMinThreads(0, 0);

foreach (var batchSize in new[] {100, 150, 200, 250, 300})
foreach (var batchSize in new[] { 100, 150, 200, 250, 300 })
{
Configuration.ResetAgent();
ResetCounters();
Expand All @@ -41,7 +41,8 @@ public static async Task Main(string[] args)

var elapsed = await RunWorkers(
() => new RunMemberInProcGraceful(),
() => RunBatchClient(batchSize, cluster));
() => RunBatchClient(batchSize, cluster)
);
var tps = _requestCount / elapsed.TotalMilliseconds * 1000;
Console.WriteLine();
Console.WriteLine($"Batch Size:\t{batchSize}");
Expand All @@ -62,7 +63,12 @@ private static void ResetCounters()
_successCount = 0;
}

private static async Task SendRequest(Cluster cluster, ClusterIdentity id, CancellationToken cancellationToken, ISenderContext? context = null)
private static async Task SendRequest(
Cluster cluster,
ClusterIdentity id,
CancellationToken cancellationToken,
ISenderContext? context = null
)
{
Interlocked.Increment(ref _requestCount);

Expand Down Expand Up @@ -90,7 +96,7 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance
}
catch (TimeoutException)
{
// ignored
// ignored
}

OnError();
Expand All @@ -102,7 +108,8 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

void OnError()
{
if (cluster.System.Shutdown.IsCancellationRequested) return;
if (cluster.System.Shutdown.IsCancellationRequested)
return;

Interlocked.Increment(ref _failureCount);

Expand All @@ -124,16 +131,16 @@ private static Task RunBatchClient(int batchSize, Cluster cluster)

var logger = Log.CreateLogger(nameof(Program));

_ = SafeTask.Run(async () => {
var rnd = new Random();
var semaphore = new AsyncSemaphore(5);
_ = SafeTask.Run(async () =>
{
var rnd = new Random();
var semaphore = new AsyncSemaphore(5);

while (!cluster.System.Shutdown.IsCancellationRequested)
{
await semaphore.WaitAsync(() => RunBatch(rnd, cluster));
}
while (!cluster.System.Shutdown.IsCancellationRequested)
{
await semaphore.WaitAsync(() => RunBatch(rnd, cluster));
}
);
});

async Task RunBatch(Random? rnd, Cluster cluster)
{
Expand Down Expand Up @@ -164,7 +171,10 @@ async Task RunBatch(Random? rnd, Cluster cluster)
return Task.CompletedTask;
}

private static async Task<TimeSpan> RunWorkers(Func<IRunMember> memberFactory, Func<Task> startClient)
private static async Task<TimeSpan> RunWorkers(
Func<IRunMember> memberFactory,
Func<Task> startClient
)
{
var followers = new List<IRunMember>();

Expand Down Expand Up @@ -205,4 +215,4 @@ private static async Task<TimeSpan> RunWorkers(Func<IRunMember> memberFactory, F
sw.Stop();
return sw.Elapsed;
}
}
}
2 changes: 1 addition & 1 deletion benchmarks/AutoClusterBenchmark/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ public class RunMemberInProcGraceful : IRunMember
public async Task Start() => _cluster = await Configuration.SpawnMember();

public async Task Kill() => await _cluster!.ShutdownAsync();
}
}
2 changes: 1 addition & 1 deletion benchmarks/AutoClusterBenchmark/WorkerActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ public Task ReceiveAsync(IContext ctx)

return Task.CompletedTask;
}
}
}
3 changes: 2 additions & 1 deletion benchmarks/ClusterBenchmark/ClrMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
namespace ClusterExperiment1;

public record HelloRequestPoco();
public record HelloResponsePoco();

public record HelloResponsePoco();
Loading

0 comments on commit cec3bf2

Please sign in to comment.