Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

formatting #2042

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 61 additions & 47 deletions benchmarks/AutoClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ IIdentityLookup identityLookup

private static GrpcNetRemoteConfig GetRemoteConfig()
{
var portStr = Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}";
var portStr =
Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}";
var port = int.Parse(portStr);
var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? RemoteConfigBase.Localhost;
var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC");

var remoteConfig = GrpcNetRemoteConfig
.BindTo(host, port)
.WithAdvertisedHost(advertisedHost)
.WithChannelOptions(new GrpcChannelOptions
.WithChannelOptions(
new GrpcChannelOptions
{
CompressionProviders = new[]
{
Expand All @@ -63,12 +65,13 @@ private static GrpcNetRemoteConfig GetRemoteConfig()

private static InMemAgent Agent = null!;

private static TestProviderOptions Options = new()
{
DeregisterCritical = TimeSpan.FromSeconds(10),
RefreshTtl = TimeSpan.FromSeconds(5),
ServiceTtl = TimeSpan.FromSeconds(3)
};
private static TestProviderOptions Options =
new()
{
DeregisterCritical = TimeSpan.FromSeconds(10),
RefreshTtl = TimeSpan.FromSeconds(5),
ServiceTtl = TimeSpan.FromSeconds(3)
};

public static void ResetAgent()
{
Expand All @@ -77,36 +80,40 @@ public static void ResetAgent()

private static IClusterProvider ClusterProvider() => new TestProvider(Options, Agent);

private static IIdentityLookup GetIdentityLookup() => new PartitionIdentityLookup(
new PartitionConfig
{
RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5),
GetPidTimeout = TimeSpan.FromSeconds(5),
RebalanceRequestTimeout = TimeSpan.FromSeconds(1),
Mode = PartitionIdentityLookup.Mode.Push,
}
);
private static IIdentityLookup GetIdentityLookup() =>
new PartitionIdentityLookup(
new PartitionConfig
{
RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5),
GetPidTimeout = TimeSpan.FromSeconds(5),
RebalanceRequestTimeout = TimeSpan.FromSeconds(1),
Mode = PartitionIdentityLookup.Mode.Push,
}
);

public static async Task<Cluster> SpawnMember()
{
var system = new ActorSystem(GetMemberActorSystemConfig()
);
system.EventStream.Subscribe<ClusterTopology>(e => {
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}");
Console.ResetColor();
}
);
system.EventStream.Subscribe<LeaderElected>(e => {
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
}
);
var system = new ActorSystem(GetMemberActorSystemConfig());
system.EventStream.Subscribe<ClusterTopology>(e =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(
$"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"
);
Console.ResetColor();
});
system.EventStream.Subscribe<LeaderElected>(e =>
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
});
var clusterProvider = ClusterProvider();
var identity = GetIdentityLookup();

system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity));
system
.WithRemote(GetRemoteConfig())
.WithCluster(GetClusterConfig(clusterProvider, identity));
await system.Cluster().StartMemberAsync();
return system.Cluster();
}
Expand All @@ -124,28 +131,35 @@ private static ActorSystemConfig GetMemberActorSystemConfig()

public static async Task<Cluster> SpawnClient()
{
var config = new ActorSystemConfig().WithDeadLetterThrottleCount(3)
var config = new ActorSystemConfig()
.WithDeadLetterThrottleCount(3)
.WithSharedFutures()
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
var system = new ActorSystem(EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config);
system.EventStream.Subscribe<ClusterTopology>(e => {
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}");
Console.ResetColor();
}
);
system.EventStream.Subscribe<LeaderElected>(e => {
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
}
var system = new ActorSystem(
EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config
);
system.EventStream.Subscribe<ClusterTopology>(e =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(
$"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"
);
Console.ResetColor();
});
system.EventStream.Subscribe<LeaderElected>(e =>
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}");
Console.ResetColor();
});
var clusterProvider = ClusterProvider();
var identity = GetIdentityLookup();
system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity));
system
.WithRemote(GetRemoteConfig())
.WithCluster(GetClusterConfig(clusterProvider, identity));

await system.Cluster().StartClientAsync();
return system.Cluster();
}
}
}
40 changes: 25 additions & 15 deletions benchmarks/AutoClusterBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static async Task Main(string[] args)
{
ThreadPool.SetMinThreads(0, 0);

foreach (var batchSize in new[] {100, 150, 200, 250, 300})
foreach (var batchSize in new[] { 100, 150, 200, 250, 300 })
{
Configuration.ResetAgent();
ResetCounters();
Expand All @@ -41,7 +41,8 @@ public static async Task Main(string[] args)

var elapsed = await RunWorkers(
() => new RunMemberInProcGraceful(),
() => RunBatchClient(batchSize, cluster));
() => RunBatchClient(batchSize, cluster)
);
var tps = _requestCount / elapsed.TotalMilliseconds * 1000;
Console.WriteLine();
Console.WriteLine($"Batch Size:\t{batchSize}");
Expand All @@ -62,7 +63,12 @@ private static void ResetCounters()
_successCount = 0;
}

private static async Task SendRequest(Cluster cluster, ClusterIdentity id, CancellationToken cancellationToken, ISenderContext? context = null)
private static async Task SendRequest(
Cluster cluster,
ClusterIdentity id,
CancellationToken cancellationToken,
ISenderContext? context = null
)
{
Interlocked.Increment(ref _requestCount);

Expand Down Expand Up @@ -90,7 +96,7 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance
}
catch (TimeoutException)
{
// ignored
// ignored
}

OnError();
Expand All @@ -102,7 +108,8 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

void OnError()
{
if (cluster.System.Shutdown.IsCancellationRequested) return;
if (cluster.System.Shutdown.IsCancellationRequested)
return;

Interlocked.Increment(ref _failureCount);

Expand All @@ -124,16 +131,16 @@ private static Task RunBatchClient(int batchSize, Cluster cluster)

var logger = Log.CreateLogger(nameof(Program));

_ = SafeTask.Run(async () => {
var rnd = new Random();
var semaphore = new AsyncSemaphore(5);
_ = SafeTask.Run(async () =>
{
var rnd = new Random();
var semaphore = new AsyncSemaphore(5);

while (!cluster.System.Shutdown.IsCancellationRequested)
{
await semaphore.WaitAsync(() => RunBatch(rnd, cluster));
}
while (!cluster.System.Shutdown.IsCancellationRequested)
{
await semaphore.WaitAsync(() => RunBatch(rnd, cluster));
}
);
});

async Task RunBatch(Random? rnd, Cluster cluster)
{
Expand Down Expand Up @@ -164,7 +171,10 @@ async Task RunBatch(Random? rnd, Cluster cluster)
return Task.CompletedTask;
}

private static async Task<TimeSpan> RunWorkers(Func<IRunMember> memberFactory, Func<Task> startClient)
private static async Task<TimeSpan> RunWorkers(
Func<IRunMember> memberFactory,
Func<Task> startClient
)
{
var followers = new List<IRunMember>();

Expand Down Expand Up @@ -205,4 +215,4 @@ private static async Task<TimeSpan> RunWorkers(Func<IRunMember> memberFactory, F
sw.Stop();
return sw.Elapsed;
}
}
}
2 changes: 1 addition & 1 deletion benchmarks/AutoClusterBenchmark/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ public class RunMemberInProcGraceful : IRunMember
public async Task Start() => _cluster = await Configuration.SpawnMember();

public async Task Kill() => await _cluster!.ShutdownAsync();
}
}
2 changes: 1 addition & 1 deletion benchmarks/AutoClusterBenchmark/WorkerActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ public Task ReceiveAsync(IContext ctx)

return Task.CompletedTask;
}
}
}
3 changes: 2 additions & 1 deletion benchmarks/ClusterBenchmark/ClrMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
namespace ClusterExperiment1;

public record HelloRequestPoco();
public record HelloResponsePoco();

public record HelloResponsePoco();
Loading