diff --git a/benchmarks/AutoClusterBenchmark/Configuration.cs b/benchmarks/AutoClusterBenchmark/Configuration.cs index de35b2f80e..9cd1c658d3 100644 --- a/benchmarks/AutoClusterBenchmark/Configuration.cs +++ b/benchmarks/AutoClusterBenchmark/Configuration.cs @@ -39,7 +39,8 @@ IIdentityLookup identityLookup private static GrpcNetRemoteConfig GetRemoteConfig() { - var portStr = Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}"; + var portStr = + Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}"; var port = int.Parse(portStr); var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? RemoteConfigBase.Localhost; var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC"); @@ -47,7 +48,8 @@ private static GrpcNetRemoteConfig GetRemoteConfig() var remoteConfig = GrpcNetRemoteConfig .BindTo(host, port) .WithAdvertisedHost(advertisedHost) - .WithChannelOptions(new GrpcChannelOptions + .WithChannelOptions( + new GrpcChannelOptions { CompressionProviders = new[] { @@ -63,12 +65,13 @@ private static GrpcNetRemoteConfig GetRemoteConfig() private static InMemAgent Agent = null!; - private static TestProviderOptions Options = new() - { - DeregisterCritical = TimeSpan.FromSeconds(10), - RefreshTtl = TimeSpan.FromSeconds(5), - ServiceTtl = TimeSpan.FromSeconds(3) - }; + private static TestProviderOptions Options = + new() + { + DeregisterCritical = TimeSpan.FromSeconds(10), + RefreshTtl = TimeSpan.FromSeconds(5), + ServiceTtl = TimeSpan.FromSeconds(3) + }; public static void ResetAgent() { @@ -77,36 +80,40 @@ public static void ResetAgent() private static IClusterProvider ClusterProvider() => new TestProvider(Options, Agent); - private static IIdentityLookup GetIdentityLookup() => new PartitionIdentityLookup( - new PartitionConfig - { - RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5), - GetPidTimeout = TimeSpan.FromSeconds(5), - RebalanceRequestTimeout = TimeSpan.FromSeconds(1), - Mode = PartitionIdentityLookup.Mode.Push, - } - ); + private static IIdentityLookup GetIdentityLookup() => + new PartitionIdentityLookup( + new PartitionConfig + { + RebalanceActivationsCompletionTimeout = TimeSpan.FromSeconds(5), + GetPidTimeout = TimeSpan.FromSeconds(5), + RebalanceRequestTimeout = TimeSpan.FromSeconds(1), + Mode = PartitionIdentityLookup.Mode.Push, + } + ); public static async Task SpawnMember() { - var system = new ActorSystem(GetMemberActorSystemConfig() - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Yellow; - Console.WriteLine($"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"); - Console.ResetColor(); - } - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Cyan; - Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); - Console.ResetColor(); - } - ); + var system = new ActorSystem(GetMemberActorSystemConfig()); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine( + $"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}" + ); + Console.ResetColor(); + }); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); + Console.ResetColor(); + }); var clusterProvider = ClusterProvider(); var identity = GetIdentityLookup(); - system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity)); + system + .WithRemote(GetRemoteConfig()) + .WithCluster(GetClusterConfig(clusterProvider, identity)); await system.Cluster().StartMemberAsync(); return system.Cluster(); } @@ -124,28 +131,35 @@ private static ActorSystemConfig GetMemberActorSystemConfig() public static async Task SpawnClient() { - var config = new ActorSystemConfig().WithDeadLetterThrottleCount(3) + var config = new ActorSystemConfig() + .WithDeadLetterThrottleCount(3) .WithSharedFutures() .WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1)) .WithDeadLetterRequestLogging(false); - var system = new ActorSystem(EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Yellow; - Console.WriteLine($"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"); - Console.ResetColor(); - } - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Cyan; - Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); - Console.ResetColor(); - } + var system = new ActorSystem( + EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config ); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine( + $"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}" + ); + Console.ResetColor(); + }); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); + Console.ResetColor(); + }); var clusterProvider = ClusterProvider(); var identity = GetIdentityLookup(); - system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity)); + system + .WithRemote(GetRemoteConfig()) + .WithCluster(GetClusterConfig(clusterProvider, identity)); await system.Cluster().StartClientAsync(); return system.Cluster(); } -} \ No newline at end of file +} diff --git a/benchmarks/AutoClusterBenchmark/Program.cs b/benchmarks/AutoClusterBenchmark/Program.cs index 7e3b6c71d6..4811b1a13e 100644 --- a/benchmarks/AutoClusterBenchmark/Program.cs +++ b/benchmarks/AutoClusterBenchmark/Program.cs @@ -32,7 +32,7 @@ public static async Task Main(string[] args) { ThreadPool.SetMinThreads(0, 0); - foreach (var batchSize in new[] {100, 150, 200, 250, 300}) + foreach (var batchSize in new[] { 100, 150, 200, 250, 300 }) { Configuration.ResetAgent(); ResetCounters(); @@ -41,7 +41,8 @@ public static async Task Main(string[] args) var elapsed = await RunWorkers( () => new RunMemberInProcGraceful(), - () => RunBatchClient(batchSize, cluster)); + () => RunBatchClient(batchSize, cluster) + ); var tps = _requestCount / elapsed.TotalMilliseconds * 1000; Console.WriteLine(); Console.WriteLine($"Batch Size:\t{batchSize}"); @@ -62,7 +63,12 @@ private static void ResetCounters() _successCount = 0; } - private static async Task SendRequest(Cluster cluster, ClusterIdentity id, CancellationToken cancellationToken, ISenderContext? context = null) + private static async Task SendRequest( + Cluster cluster, + ClusterIdentity id, + CancellationToken cancellationToken, + ISenderContext? context = null + ) { Interlocked.Increment(ref _requestCount); @@ -90,7 +96,7 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance } catch (TimeoutException) { - // ignored + // ignored } OnError(); @@ -102,7 +108,8 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance void OnError() { - if (cluster.System.Shutdown.IsCancellationRequested) return; + if (cluster.System.Shutdown.IsCancellationRequested) + return; Interlocked.Increment(ref _failureCount); @@ -124,16 +131,16 @@ private static Task RunBatchClient(int batchSize, Cluster cluster) var logger = Log.CreateLogger(nameof(Program)); - _ = SafeTask.Run(async () => { - var rnd = new Random(); - var semaphore = new AsyncSemaphore(5); + _ = SafeTask.Run(async () => + { + var rnd = new Random(); + var semaphore = new AsyncSemaphore(5); - while (!cluster.System.Shutdown.IsCancellationRequested) - { - await semaphore.WaitAsync(() => RunBatch(rnd, cluster)); - } + while (!cluster.System.Shutdown.IsCancellationRequested) + { + await semaphore.WaitAsync(() => RunBatch(rnd, cluster)); } - ); + }); async Task RunBatch(Random? rnd, Cluster cluster) { @@ -164,7 +171,10 @@ async Task RunBatch(Random? rnd, Cluster cluster) return Task.CompletedTask; } - private static async Task RunWorkers(Func memberFactory, Func startClient) + private static async Task RunWorkers( + Func memberFactory, + Func startClient + ) { var followers = new List(); @@ -205,4 +215,4 @@ private static async Task RunWorkers(Func memberFactory, F sw.Stop(); return sw.Elapsed; } -} \ No newline at end of file +} diff --git a/benchmarks/AutoClusterBenchmark/Runner.cs b/benchmarks/AutoClusterBenchmark/Runner.cs index 3d921b2204..ae17fc72d1 100644 --- a/benchmarks/AutoClusterBenchmark/Runner.cs +++ b/benchmarks/AutoClusterBenchmark/Runner.cs @@ -22,4 +22,4 @@ public class RunMemberInProcGraceful : IRunMember public async Task Start() => _cluster = await Configuration.SpawnMember(); public async Task Kill() => await _cluster!.ShutdownAsync(); -} \ No newline at end of file +} diff --git a/benchmarks/AutoClusterBenchmark/WorkerActor.cs b/benchmarks/AutoClusterBenchmark/WorkerActor.cs index 7c926cc427..2701c21977 100644 --- a/benchmarks/AutoClusterBenchmark/WorkerActor.cs +++ b/benchmarks/AutoClusterBenchmark/WorkerActor.cs @@ -27,4 +27,4 @@ public Task ReceiveAsync(IContext ctx) return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterBenchmark/ClrMessages.cs b/benchmarks/ClusterBenchmark/ClrMessages.cs index 2b9537db34..637105c81d 100644 --- a/benchmarks/ClusterBenchmark/ClrMessages.cs +++ b/benchmarks/ClusterBenchmark/ClrMessages.cs @@ -6,4 +6,5 @@ namespace ClusterExperiment1; public record HelloRequestPoco(); -public record HelloResponsePoco(); \ No newline at end of file + +public record HelloResponsePoco(); diff --git a/benchmarks/ClusterBenchmark/Configuration.cs b/benchmarks/ClusterBenchmark/Configuration.cs index 96f54779f2..fe6c0ace52 100644 --- a/benchmarks/ClusterBenchmark/Configuration.cs +++ b/benchmarks/ClusterBenchmark/Configuration.cs @@ -39,19 +39,20 @@ public static class Configuration private static TracerProvider? tracerProvider; #pragma warning disable CS0162 -// ReSharper disable once HeuristicUnreachableCode + // ReSharper disable once HeuristicUnreachableCode private static void InitTracing() { - if (!EnableTracing) return; + if (!EnableTracing) + return; lock (InitLock) { - if (tracerProvider is not null) return; + if (tracerProvider is not null) + return; - tracerProvider = OpenTelemetry.Sdk.CreateTracerProviderBuilder() - .SetResourceBuilder(ResourceBuilder.CreateDefault() - .AddService("ClusterBenchmark") - ) + tracerProvider = OpenTelemetry.Sdk + .CreateTracerProviderBuilder() + .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ClusterBenchmark")) .AddProtoActorInstrumentation() .AddJaegerExporter(options => options.AgentHost = "localhost") .Build(); @@ -74,7 +75,8 @@ IIdentityLookup identityLookup private static GrpcNetRemoteConfig GetRemoteConfig() { - var portStr = Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}"; + var portStr = + Environment.GetEnvironmentVariable("PROTOPORT") ?? $"{RemoteConfigBase.AnyFreePort}"; var port = int.Parse(portStr); var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? RemoteConfigBase.Localhost; var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC"); @@ -82,7 +84,8 @@ private static GrpcNetRemoteConfig GetRemoteConfig() var remoteConfig = GrpcNetRemoteConfig .BindTo(host, port) .WithAdvertisedHost(advertisedHost) - .WithChannelOptions(new GrpcChannelOptions + .WithChannelOptions( + new GrpcChannelOptions { CompressionProviders = new[] { @@ -110,19 +113,24 @@ private static IClusterProvider ClusterProvider() } } - public static IIdentityLookup GetIdentityLookup() => new PartitionIdentityLookup( - new PartitionConfig - { - GetPidTimeout = TimeSpan.FromSeconds(5), - Mode = PartitionIdentityLookup.Mode.Push, - Send = PartitionIdentityLookup.Send.Delta - } - ); + public static IIdentityLookup GetIdentityLookup() => + new PartitionIdentityLookup( + new PartitionConfig + { + GetPidTimeout = TimeSpan.FromSeconds(5), + Mode = PartitionIdentityLookup.Mode.Push, + Send = PartitionIdentityLookup.Send.Delta + } + ); private static IIdentityLookup GetRedisIdentityLookup() { var multiplexer = ConnectionMultiplexer.Connect("localhost:6379"); - var redisIdentityStorage = new RedisIdentityStorage("mycluster", multiplexer, maxConcurrency: 50); + var redisIdentityStorage = new RedisIdentityStorage( + "mycluster", + multiplexer, + maxConcurrency: 50 + ); return new IdentityStorageLookup(redisIdentityStorage); } @@ -155,24 +163,27 @@ private static IMongoDatabase GetMongo() public static async Task SpawnMember() { InitTracing(); - var system = new ActorSystem(GetMemberActorSystemConfig() - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Yellow; - Console.WriteLine($"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"); - Console.ResetColor(); - } - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Cyan; - Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); - Console.ResetColor(); - } - ); + var system = new ActorSystem(GetMemberActorSystemConfig()); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine( + $"M:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}" + ); + Console.ResetColor(); + }); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine($"M:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); + Console.ResetColor(); + }); var clusterProvider = ClusterProvider(); var identity = GetIdentityLookup(); - system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity)); + system + .WithRemote(GetRemoteConfig()) + .WithCluster(GetClusterConfig(clusterProvider, identity)); await system.Cluster().StartMemberAsync(); return system.Cluster(); } @@ -193,26 +204,33 @@ private static ActorSystemConfig GetMemberActorSystemConfig() public static async Task SpawnClient() { InitTracing(); - var config = new ActorSystemConfig().WithDeadLetterThrottleCount(3) + var config = new ActorSystemConfig() + .WithDeadLetterThrottleCount(3) .WithSharedFutures() .WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1)) .WithDeadLetterRequestLogging(false); - var system = new ActorSystem(EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Yellow; - Console.WriteLine($"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}"); - Console.ResetColor(); - } - ); - system.EventStream.Subscribe(e => { - Console.ForegroundColor = ConsoleColor.Cyan; - Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); - Console.ResetColor(); - } + var system = new ActorSystem( + EnableTracing ? config.WithConfigureProps(props => props.WithTracing()) : config ); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine( + $"C:{system.Id}-{system.Address}-ClusterTopology:{e.GetMembershipHashCode()}" + ); + Console.ResetColor(); + }); + system.EventStream.Subscribe(e => + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine($"C:{system.Id}-{system.Address}-Leader:{e.Leader.Id}"); + Console.ResetColor(); + }); var clusterProvider = ClusterProvider(); var identity = GetIdentityLookup(); - system.WithRemote(GetRemoteConfig()).WithCluster(GetClusterConfig(clusterProvider, identity)); + system + .WithRemote(GetRemoteConfig()) + .WithCluster(GetClusterConfig(clusterProvider, identity)); await system.Cluster().StartClientAsync(); return system.Cluster(); @@ -220,13 +238,10 @@ public static async Task SpawnClient() public static void SetupLogger(LogLevel loglevel) { - Log.Logger = new LoggerConfiguration() - .WriteTo.Console(LogEventLevel.Error) - .CreateLogger(); + Log.Logger = new LoggerConfiguration().WriteTo.Console(LogEventLevel.Error).CreateLogger(); - Proto.Log.SetLoggerFactory(LoggerFactory.Create(l => - l.AddSerilog().SetMinimumLevel(loglevel) - ) + Proto.Log.SetLoggerFactory( + LoggerFactory.Create(l => l.AddSerilog().SetMinimumLevel(loglevel)) ); } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterBenchmark/DockerSupport.cs b/benchmarks/ClusterBenchmark/DockerSupport.cs index 0d51985fad..ce8bed2d60 100644 --- a/benchmarks/ClusterBenchmark/DockerSupport.cs +++ b/benchmarks/ClusterBenchmark/DockerSupport.cs @@ -8,7 +8,7 @@ namespace ClusterExperiment1; -public static class DockerSupport +public static class DockerSupport { public static async Task Run(Task done) { @@ -56,4 +56,4 @@ public static async Task Run(Task done) await done; Console.WriteLine("Exited......."); } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterBenchmark/Program.cs b/benchmarks/ClusterBenchmark/Program.cs index 86563768d9..701f35dd58 100644 --- a/benchmarks/ClusterBenchmark/Program.cs +++ b/benchmarks/ClusterBenchmark/Program.cs @@ -43,7 +43,10 @@ public static async Task Main(string[] args) var l = typeof(Program).Assembly.Location; Console.WriteLine($"Worker running {l}"); var worker = await Configuration.SpawnMember(); - AppDomain.CurrentDomain.ProcessExit += (sender, args) => { worker.ShutdownAsync().Wait(); }; + AppDomain.CurrentDomain.ProcessExit += (sender, args) => + { + worker.ShutdownAsync().Wait(); + }; Thread.Sleep(Timeout.Infinite); return; @@ -71,7 +74,9 @@ public static async Task Main(string[] args) Console.WriteLine("2) Run single process"); Console.WriteLine("3) Run multi process - graceful exit"); Console.WriteLine("4) Run multi process"); - Console.WriteLine("5) Run single process, single node, Batch(300), ProtoBuf, 10 actors, 60S"); + Console.WriteLine( + "5) Run single process, single node, Batch(300), ProtoBuf, 10 actors, 60S" + ); var memberRunStrategy = Console.ReadLine(); var batchSize = 0; @@ -107,21 +112,25 @@ public static async Task Main(string[] args) { Console.WriteLine("Batch size? default is 50"); - if (!int.TryParse(Console.ReadLine(), out batchSize)) batchSize = 50; + if (!int.TryParse(Console.ReadLine(), out batchSize)) + batchSize = 50; Console.WriteLine($"Using batch size {batchSize}"); } Console.WriteLine("Number of virtual actors? default 10000"); - if (!int.TryParse(Console.ReadLine(), out actorCount)) actorCount = 10_000; + if (!int.TryParse(Console.ReadLine(), out actorCount)) + actorCount = 10_000; Console.WriteLine($"Using {actorCount} actors"); Console.WriteLine("Number of cluster members? default is 8"); - if (!int.TryParse(Console.ReadLine(), out memberCount)) memberCount = 8; + if (!int.TryParse(Console.ReadLine(), out memberCount)) + memberCount = 8; Console.WriteLine($"Using {memberCount} members"); Console.WriteLine("Seconds to run before stopping members? default is 30"); - if (!int.TryParse(Console.ReadLine(), out killTimeoutSeconds)) killTimeoutSeconds = 30; + if (!int.TryParse(Console.ReadLine(), out killTimeoutSeconds)) + killTimeoutSeconds = 30; Console.WriteLine($"Using {killTimeoutSeconds} seconds"); } @@ -132,17 +141,19 @@ public static async Task Main(string[] args) "3" => () => RunFireForgetClient(), "4" => () => RunDebugClient(), "5" => () => RunNoopClient(), - _ => throw new ArgumentOutOfRangeException() + _ => throw new ArgumentOutOfRangeException() }; - var elapsed = await (memberRunStrategy switch - { - "1" => RunWorkers(() => new RunMemberInProcGraceful(), run), - "2" => RunWorkers(() => new RunMemberInProc(), run), - "3" => RunWorkers(() => new RunMemberExternalProcGraceful(), run), - "4" => RunWorkers(() => new RunMemberExternalProc(), run), - _ => throw new ArgumentOutOfRangeException() - }); + var elapsed = await ( + memberRunStrategy switch + { + "1" => RunWorkers(() => new RunMemberInProcGraceful(), run), + "2" => RunWorkers(() => new RunMemberInProc(), run), + "3" => RunWorkers(() => new RunMemberExternalProcGraceful(), run), + "4" => RunWorkers(() => new RunMemberExternalProc(), run), + _ => throw new ArgumentOutOfRangeException() + } + ); var tps = requestCount / elapsed.TotalMilliseconds * 1000; Console.WriteLine(); @@ -152,30 +163,35 @@ public static async Task Main(string[] args) Console.WriteLine($"Throughput:\t{tps:N0} requests/sec -> {(tps * 2):N0} msg/sec"); } - private static void RunNoopClient() - { - } + private static void RunNoopClient() { } private static void RunFireForgetClient() { var logger = Log.CreateLogger(nameof(Program)); - _ = SafeTask.Run(async () => { - var semaphore = new AsyncSemaphore(50); - var cluster = await Configuration.SpawnClient(); - // var rnd = new Random(); - var i = 0; + _ = SafeTask.Run(async () => + { + var semaphore = new AsyncSemaphore(50); + var cluster = await Configuration.SpawnClient(); + // var rnd = new Random(); + var i = 0; - while (true) - { - var id = "myactor" + (i++ % actorCount); - await semaphore.WaitAsync(() => SendRequest(cluster, id, CancellationTokens.FromSeconds(20))); - } + while (true) + { + var id = "myactor" + (i++ % actorCount); + await semaphore.WaitAsync( + () => SendRequest(cluster, id, CancellationTokens.FromSeconds(20)) + ); } - ); + }); } - private static async Task SendRequest(Cluster cluster, ClusterIdentity id, CancellationToken cancellationToken, ISenderContext? context = null) + private static async Task SendRequest( + Cluster cluster, + ClusterIdentity id, + CancellationToken cancellationToken, + ISenderContext? context = null + ) { Interlocked.Increment(ref requestCount); @@ -203,7 +219,7 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance } catch (TimeoutException) { - // ignored + // ignored } OnError(); @@ -223,7 +239,12 @@ void OnError() } } - private static async Task SendRequest(Cluster cluster, string id, CancellationToken cancellationToken, ISenderContext? context = null) + private static async Task SendRequest( + Cluster cluster, + string id, + CancellationToken cancellationToken, + ISenderContext? context = null + ) { Interlocked.Increment(ref requestCount); @@ -236,7 +257,13 @@ private static async Task SendRequest(Cluster cluster, string id, Cancella { try { - await cluster.RequestAsync(id, "hello", Request, context, cancellationToken); + await cluster.RequestAsync( + id, + "hello", + Request, + context, + cancellationToken + ); var res = Interlocked.Increment(ref successCount); @@ -251,7 +278,7 @@ private static async Task SendRequest(Cluster cluster, string id, Cancella } catch (TimeoutException) { - // ignored + // ignored } OnError(); @@ -285,20 +312,20 @@ private static void RunBatchClient(int batchSize) var logger = Log.CreateLogger(nameof(Program)); - _ = SafeTask.Run(async () => { - var cluster = await Configuration.SpawnClient(); - // var rnd = new Random(); - var semaphore = new AsyncSemaphore(5); - var i = 0; + _ = SafeTask.Run(async () => + { + var cluster = await Configuration.SpawnClient(); + // var rnd = new Random(); + var semaphore = new AsyncSemaphore(5); + var i = 0; - while (true) - { - var b = i; - await semaphore.WaitAsync(() => RunBatch(b, cluster)); - i = (i + batchSize) % actorCount; - } + while (true) + { + var b = i; + await semaphore.WaitAsync(() => RunBatch(b, cluster)); + i = (i + batchSize) % actorCount; } - ); + }); async Task RunBatch(int startIndex, Cluster cluster) { @@ -331,53 +358,59 @@ private static void RunDebugClient() { var logger = Log.CreateLogger(nameof(Program)); - _ = SafeTask.Run(async () => { - var cluster = await Configuration.SpawnClient(); - var rnd = new Random(); + _ = SafeTask.Run(async () => + { + var cluster = await Configuration.SpawnClient(); + var rnd = new Random(); - while (true) + while (true) + { + var id = "myactor" + rnd.Next(0, actorCount); + var ct = CancellationTokens.FromSeconds(20); + var res = await SendRequest(cluster, id, ct); + + if (!res) { - var id = "myactor" + rnd.Next(0, actorCount); - var ct = CancellationTokens.FromSeconds(20); - var res = await SendRequest(cluster, id, ct); + var pid = await cluster.GetAsync( + ClusterIdentity.Create(id, "hello"), + CancellationTokens.FromSeconds(10) + ); - if (!res) + if (pid != null) + { + logger.LogError("Failed call to {Id} - {Address}", id, pid.Address); + } + else { - var pid = await cluster.GetAsync(ClusterIdentity.Create(id, "hello"), CancellationTokens.FromSeconds(10)); - - if (pid != null) - { - logger.LogError("Failed call to {Id} - {Address}", id, pid.Address); - } - else - { - logger.LogError("Failed call to {Id} - Null PID", id); - } + logger.LogError("Failed call to {Id} - Null PID", id); } } } - ); + }); } private static void RunClient() { var logger = Log.CreateLogger(nameof(Program)); - _ = SafeTask.Run(async () => { - var cluster = await Configuration.SpawnClient(); - var rnd = new Random(); + _ = SafeTask.Run(async () => + { + var cluster = await Configuration.SpawnClient(); + var rnd = new Random(); - while (true) - { - var id = "myactor" + rnd.Next(0, actorCount); - var ct = CancellationTokens.FromSeconds(20); - await SendRequest(cluster, id, ct); - } + while (true) + { + var id = "myactor" + rnd.Next(0, actorCount); + var ct = CancellationTokens.FromSeconds(20); + await SendRequest(cluster, id, ct); } - ); + }); } - private static async Task RunWorkers(Func memberFactory, Action startClient) + private static async Task RunWorkers( + Func memberFactory, + Action startClient + ) { var followers = new List(); @@ -418,4 +451,4 @@ private static async Task RunWorkers(Func memberFactory, A sw.Stop(); return sw.Elapsed; } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterBenchmark/Runner.cs b/benchmarks/ClusterBenchmark/Runner.cs index 68f789537b..ee1861e723 100644 --- a/benchmarks/ClusterBenchmark/Runner.cs +++ b/benchmarks/ClusterBenchmark/Runner.cs @@ -80,4 +80,4 @@ public Task Kill() _process?.Kill(true); return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterBenchmark/WorkerActor.cs b/benchmarks/ClusterBenchmark/WorkerActor.cs index 1165099206..6b77f8eb74 100644 --- a/benchmarks/ClusterBenchmark/WorkerActor.cs +++ b/benchmarks/ClusterBenchmark/WorkerActor.cs @@ -30,11 +30,11 @@ public Task ReceiveAsync(IContext ctx) // if (_rnd.Next(0, 1000) == 0) // { - // + // // // ctx.Stop(ctx.Self); // } return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs index ca3a72754d..195ffa3b81 100644 --- a/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs @@ -30,11 +30,12 @@ public class ConcurrentSpawnBenchmark [GlobalSetup] public async Task Setup() { - var echoProps = Props.FromFunc(ctx => { - if (ctx.Sender is not null) ctx.Respond(ctx.Message!); - return Task.CompletedTask; - } - ); + var echoProps = Props.FromFunc(ctx => + { + if (ctx.Sender is not null) + ctx.Respond(ctx.Message!); + return Task.CompletedTask; + }); var echoKind = new ClusterKind(Kind, echoProps); @@ -46,10 +47,12 @@ public async Task Setup() await _cluster.StartMemberAsync(); } - private ClusterConfig ClusterConfig() => Proto.Cluster.ClusterConfig.Setup("test-cluster", - new TestProvider(new TestProviderOptions(), new InMemAgent()), - GetIdentityLookup() - ); + private ClusterConfig ClusterConfig() => + Proto.Cluster.ClusterConfig.Setup( + "test-cluster", + new TestProvider(new TestProviderOptions(), new InMemAgent()), + GetIdentityLookup() + ); private PartitionIdentityLookup GetIdentityLookup() { @@ -80,7 +83,8 @@ public async Task SpawnIdentities() for (var i = 0; i < ConcurrentSpawns; i++) { - if (pids[i] is null) throw new Exception("Failed to return id " + i); + if (pids[i] is null) + throw new Exception("Failed to return id " + i); } } @@ -117,4 +121,4 @@ public enum IdentityLookup Redis, MongoDb } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs index 2f66e5650b..809317c2aa 100644 --- a/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs @@ -37,11 +37,12 @@ public class InProcessClusterBatchRequestBenchmark [GlobalSetup] public async Task Setup() { - var echoProps = Props.FromFunc(ctx => { - if (ctx.Sender is not null) ctx.Respond(ctx.Message!); - return Task.CompletedTask; - } - ); + var echoProps = Props.FromFunc(ctx => + { + if (ctx.Sender is not null) + ctx.Respond(ctx.Message!); + return Task.CompletedTask; + }); var echoKind = new ClusterKind(Kind, echoProps); var sys = new ActorSystem(new ActorSystemConfig()) @@ -66,14 +67,17 @@ public async Task Setup() private ClusterConfig ClusterConfig() { - var config = Proto.Cluster.ClusterConfig.Setup("testcluster", + var config = Proto.Cluster.ClusterConfig.Setup( + "testcluster", new TestProvider(new TestProviderOptions(), new InMemAgent()), new PartitionIdentityLookup() ); if (ExperimentalContext) { - config = config.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)); + config = config.WithClusterContextProducer( + cluster => new DefaultClusterContext(cluster) + ); } return config; @@ -117,7 +121,9 @@ private ClusterConfig ClusterConfig() [Benchmark] public async Task ClusterRequestAsyncBatchReuseIdentity() { - var ct = PassCancellationToken ? CancellationTokens.FromSeconds(10) : CancellationToken.None; + var ct = PassCancellationToken + ? CancellationTokens.FromSeconds(10) + : CancellationToken.None; using var batch = _cluster.System.Root.CreateBatchContext(BatchSize, ct); var tasks = new Task[BatchSize]; @@ -129,12 +135,13 @@ public async Task ClusterRequestAsyncBatchReuseIdentity() await Task.WhenAll(tasks); } - [Benchmark] public async Task ClusterRequestAsyncReuseIdentity() { - var ct = PassCancellationToken ? CancellationTokens.FromSeconds(10) : CancellationToken.None; + var ct = PassCancellationToken + ? CancellationTokens.FromSeconds(10) + : CancellationToken.None; var tasks = new Task[BatchSize]; @@ -146,4 +153,4 @@ public async Task ClusterRequestAsyncReuseIdentity() await Task.WhenAll(tasks); } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/InProcessClusterRequestBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/InProcessClusterRequestBenchmark.cs index 4347f355f5..24eb2713c3 100644 --- a/benchmarks/ClusterMicroBenchmarks/InProcessClusterRequestBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/InProcessClusterRequestBenchmark.cs @@ -36,11 +36,12 @@ public class InProcessClusterRequestBenchmark [GlobalSetup] public async Task Setup() { - var echoProps = Props.FromFunc(ctx => { - if (ctx.Sender is not null) ctx.Respond(ctx.Message!); - return Task.CompletedTask; - } - ); + var echoProps = Props.FromFunc(ctx => + { + if (ctx.Sender is not null) + ctx.Respond(ctx.Message!); + return Task.CompletedTask; + }); if (RequestDeduplication) { @@ -54,11 +55,7 @@ public async Task Setup() echoKind.WithLocalAffinityRelocationStrategy(); } - var sys = new ActorSystem(new ActorSystemConfig - { - SharedFutures = SharedFutures - } - ) + var sys = new ActorSystem(new ActorSystemConfig { SharedFutures = SharedFutures }) .WithRemote(GrpcNetRemoteConfig.BindToLocalhost(9090)) .WithCluster(ClusterConfig().WithClusterKind(echoKind)); @@ -71,20 +68,25 @@ public async Task Setup() await _cluster.RequestAsync(_id.Identity, _id.Kind, 1, CancellationToken.None); } - private static ClusterConfig ClusterConfig() => Proto.Cluster.ClusterConfig.Setup("testcluster", - new TestProvider(new TestProviderOptions(), new InMemAgent()), - new PartitionIdentityLookup() - ); + private static ClusterConfig ClusterConfig() => + Proto.Cluster.ClusterConfig.Setup( + "testcluster", + new TestProvider(new TestProviderOptions(), new InMemAgent()), + new PartitionIdentityLookup() + ); [GlobalCleanup] public Task Cleanup() => _cluster.ShutdownAsync(); [Benchmark] - public Task RequestAsync() => _cluster.System.Root.RequestAsync(pid, 1, CancellationToken.None); + public Task RequestAsync() => + _cluster.System.Root.RequestAsync(pid, 1, CancellationToken.None); [Benchmark] - public Task ClusterRequestAsync() => _cluster.RequestAsync(_id.Identity, _id.Kind, 1, CancellationToken.None); + public Task ClusterRequestAsync() => + _cluster.RequestAsync(_id.Identity, _id.Kind, 1, CancellationToken.None); [Benchmark] - public Task ClusterIdentityRequestAsync() => _cluster.RequestAsync(_id, 1, CancellationToken.None); -} \ No newline at end of file + public Task ClusterIdentityRequestAsync() => + _cluster.RequestAsync(_id, 1, CancellationToken.None); +} diff --git a/benchmarks/ClusterMicroBenchmarks/InProcessRequestAsyncBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/InProcessRequestAsyncBenchmark.cs index 35ff7b7181..91bfd0df75 100644 --- a/benchmarks/ClusterMicroBenchmarks/InProcessRequestAsyncBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/InProcessRequestAsyncBenchmark.cs @@ -19,7 +19,6 @@ public class InProcessRequestAsyncBenchmark public int BatchSize { get; set; } [Params(true, false)] - public bool UseSharedFutures { get; set; } private ActorSystem System; @@ -29,16 +28,14 @@ public class InProcessRequestAsyncBenchmark [GlobalSetup] public void Setup() { - var echoProps = Props.FromFunc(ctx => { - if (ctx.Sender is not null) ctx.Respond(ctx.Message!); - return Task.CompletedTask; - } - ); - - System = new ActorSystem(new ActorSystemConfig + var echoProps = Props.FromFunc(ctx => { - SharedFutures = UseSharedFutures + if (ctx.Sender is not null) + ctx.Respond(ctx.Message!); + return Task.CompletedTask; }); + + System = new ActorSystem(new ActorSystemConfig { SharedFutures = UseSharedFutures }); pid = System.Root.SpawnNamed(echoProps, "thing"); } @@ -105,4 +102,4 @@ public async Task BatchContextRequestAsync() await Task.WhenAll(tasks); } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/ObjectPoolBenchmarks.cs b/benchmarks/ClusterMicroBenchmarks/ObjectPoolBenchmarks.cs index 6a9002bcdc..ce51ed40e3 100644 --- a/benchmarks/ClusterMicroBenchmarks/ObjectPoolBenchmarks.cs +++ b/benchmarks/ClusterMicroBenchmarks/ObjectPoolBenchmarks.cs @@ -66,4 +66,4 @@ public void ConcurrentBag() Bag.Add(Objects[i]); } } -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/Program.cs b/benchmarks/ClusterMicroBenchmarks/Program.cs index 412f49877f..f6e86b9592 100644 --- a/benchmarks/ClusterMicroBenchmarks/Program.cs +++ b/benchmarks/ClusterMicroBenchmarks/Program.cs @@ -5,4 +5,4 @@ namespace ClusterMicroBenchmarks; class Program { static void Main() => BenchmarkRunner.Run(); -} \ No newline at end of file +} diff --git a/benchmarks/ClusterMicroBenchmarks/RendezvousBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/RendezvousBenchmark.cs index 3b0a284eba..4fb57655a3 100644 --- a/benchmarks/ClusterMicroBenchmarks/RendezvousBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/RendezvousBenchmark.cs @@ -21,7 +21,10 @@ public class RendezvousBenchmark public int NodeCount { get; set; } private int _i = 0; - private static readonly string[] Ids = Enumerable.Range(0, 100).Select(_ => Guid.NewGuid().ToString()).ToArray(); + private static readonly string[] Ids = Enumerable + .Range(0, 100) + .Select(_ => Guid.NewGuid().ToString()) + .ToArray(); private Rendezvous _rendezvous; private MemberHashRing _memberHashRing; @@ -31,13 +34,18 @@ public class RendezvousBenchmark [GlobalSetup] public void Setup() { - var members = Enumerable.Range(0, NodeCount).Select(i => new Member - { - Host = "localhost", - Id = Guid.NewGuid().ToString("N"), - Port = i + 1000 - } - ).ToArray(); + var members = Enumerable + .Range(0, NodeCount) + .Select( + i => + new Member + { + Host = "localhost", + Id = Guid.NewGuid().ToString("N"), + Port = i + 1000 + } + ) + .ToArray(); _rendezvous = new Rendezvous(); _rendezvous.UpdateMembers(members); _rendezvousFast = new RendezvousFast(members); @@ -70,4 +78,4 @@ public void HashRing() } private string TestId() => Ids[_i++ % Ids.Length]; -} \ No newline at end of file +} diff --git a/benchmarks/GossipBenchmark/Node1/Program.cs b/benchmarks/GossipBenchmark/Node1/Program.cs index 4aa68b6fc6..4b1a7890cb 100644 --- a/benchmarks/GossipBenchmark/Node1/Program.cs +++ b/benchmarks/GossipBenchmark/Node1/Program.cs @@ -15,41 +15,52 @@ using Proto.Remote; using Proto.Remote.GrpcNet; using static Proto.CancellationTokens; -using ProtosReflection =ClusterHelloWorld.Messages.ProtosReflection; +using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection; class Program { private static async Task Main() { Proto.Log.SetLoggerFactory( - LoggerFactory.Create(l => l.AddConsole().SetMinimumLevel(LogLevel.Information))); + LoggerFactory.Create(l => l.AddConsole().SetMinimumLevel(LogLevel.Information)) + ); var logger = Log.CreateLogger("benchmark"); - + // Required to allow unencrypted GrpcNet connections AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - var system = new ActorSystem(new ActorSystemConfig() + var system = new ActorSystem( + new ActorSystemConfig() .WithDeveloperSupervisionLogging(true) .WithDeadLetterRequestLogging(true) .WithDeadLetterResponseLogging(true) - .WithConfigureProps(p => p.WithDeadlineDecorator(TimeSpan.FromSeconds(1), logger).WithLoggingContextDecorator(logger))) - - .WithRemote(GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(ProtosReflection.Descriptor)) - .WithCluster(ClusterConfig - .Setup("MyCluster", new SeedNodeClusterProvider(new(("127.0.0.1", 8090))), new PartitionIdentityLookup())); + .WithConfigureProps( + p => + p.WithDeadlineDecorator(TimeSpan.FromSeconds(1), logger) + .WithLoggingContextDecorator(logger) + ) + ) + .WithRemote( + GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(ProtosReflection.Descriptor) + ) + .WithCluster( + ClusterConfig.Setup( + "MyCluster", + new SeedNodeClusterProvider(new(("127.0.0.1", 8090))), + new PartitionIdentityLookup() + ) + ); - system.EventStream.Subscribe(e => { - Console.WriteLine($"{DateTime.Now:O} My members {e.TopologyHash}"); - } - ); - - await system - .Cluster() - .StartMemberAsync(); + system.EventStream.Subscribe(e => + { + Console.WriteLine($"{DateTime.Now:O} My members {e.TopologyHash}"); + }); + + await system.Cluster().StartMemberAsync(); Console.WriteLine("Started"); var helloGrain = system.Cluster().GetHelloGrain("MyGrain"); - + var res = await helloGrain.SayHello(new HelloRequest(), FromSeconds(5)); Console.WriteLine(res.Message); @@ -61,4 +72,4 @@ await system Console.WriteLine("Shutting Down..."); await system.Cluster().ShutdownAsync(); } -} \ No newline at end of file +} diff --git a/benchmarks/GossipBenchmark/Node2/Program.cs b/benchmarks/GossipBenchmark/Node2/Program.cs index 8ff20c0ea7..cea5233c91 100644 --- a/benchmarks/GossipBenchmark/Node2/Program.cs +++ b/benchmarks/GossipBenchmark/Node2/Program.cs @@ -23,15 +23,13 @@ public class HelloGrain : HelloGrainBase { private readonly string _identity; - public HelloGrain(IContext ctx, string identity) : base(ctx) => _identity = identity; + public HelloGrain(IContext ctx, string identity) + : base(ctx) => _identity = identity; public override Task SayHello(HelloRequest request) { Console.WriteLine("Got request!!"); - var res = new HelloResponse - { - Message = $"Hello from typed grain {_identity}" - }; + var res = new HelloResponse { Message = $"Hello from typed grain {_identity}" }; return FromResult(res); } @@ -42,43 +40,58 @@ class Program private static async Task Main() { Proto.Log.SetLoggerFactory( - LoggerFactory.Create(l => l.AddConsole().SetMinimumLevel(LogLevel.Information))); - + LoggerFactory.Create(l => l.AddConsole().SetMinimumLevel(LogLevel.Information)) + ); + var logger = Log.CreateLogger("benchmark"); - + // Required to allow unencrypted GrpcNet connections AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - var system = new ActorSystem(new ActorSystemConfig() + var system = new ActorSystem( + new ActorSystemConfig() .WithDeveloperSupervisionLogging(true) .WithDeadLetterRequestLogging(true) .WithDeadLetterResponseLogging(true) - .WithConfigureProps(p => p.WithDeadlineDecorator(TimeSpan.FromSeconds(1), logger).WithLoggingContextDecorator(logger))) - .WithRemote(GrpcNetRemoteConfig.BindToLocalhost(8090).WithProtoMessages(ProtosReflection.Descriptor)) - .WithCluster(ClusterConfig - .Setup("MyCluster", new SeedNodeClusterProvider(), new PartitionIdentityLookup()) - .WithClusterKind(HelloGrainActor.GetClusterKind((ctx, identity) => new HelloGrain(ctx, identity.Identity))) + .WithConfigureProps( + p => + p.WithDeadlineDecorator(TimeSpan.FromSeconds(1), logger) + .WithLoggingContextDecorator(logger) + ) + ) + .WithRemote( + GrpcNetRemoteConfig + .BindToLocalhost(8090) + .WithProtoMessages(ProtosReflection.Descriptor) + ) + .WithCluster( + ClusterConfig + .Setup( + "MyCluster", + new SeedNodeClusterProvider(), + new PartitionIdentityLookup() + ) + .WithClusterKind( + HelloGrainActor.GetClusterKind( + (ctx, identity) => new HelloGrain(ctx, identity.Identity) + ) + ) ); - - system.EventStream.Subscribe(e => { - Console.WriteLine($"{DateTime.Now:O} My members {e.TopologyHash}"); - } - ); - await system - .Cluster() - .StartMemberAsync(); + system.EventStream.Subscribe(e => + { + Console.WriteLine($"{DateTime.Now:O} My members {e.TopologyHash}"); + }); + + await system.Cluster().StartMemberAsync(); Console.WriteLine("Started..."); - Console.CancelKeyPress += async (e, y) => { + Console.CancelKeyPress += async (e, y) => + { Console.WriteLine("Shutting Down..."); - await system - .Cluster() - .ShutdownAsync(); + await system.Cluster().ShutdownAsync(); }; - - await Delay(-1); } -} \ No newline at end of file +} diff --git a/benchmarks/HostedService/Program.cs b/benchmarks/HostedService/Program.cs index acc67b4fcd..d122d5f259 100644 --- a/benchmarks/HostedService/Program.cs +++ b/benchmarks/HostedService/Program.cs @@ -9,5 +9,8 @@ public static class Program public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) - .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); }); -} \ No newline at end of file + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.UseStartup(); + }); +} diff --git a/benchmarks/HostedService/ProtoHost.cs b/benchmarks/HostedService/ProtoHost.cs index 4d7199b77f..ecb961b386 100644 --- a/benchmarks/HostedService/ProtoHost.cs +++ b/benchmarks/HostedService/ProtoHost.cs @@ -16,7 +16,11 @@ public class ProtoHost : IHostedService private readonly Cluster _cluster; private readonly ILogger _logger; - public ProtoHost(Cluster cluster, ILogger logger, IHostApplicationLifetime appLifetime) + public ProtoHost( + Cluster cluster, + ILogger logger, + IHostApplicationLifetime appLifetime + ) { _cluster = cluster; _logger = logger; @@ -26,7 +30,9 @@ public ProtoHost(Cluster cluster, ILogger logger, IHostApplicationLif public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("Starting cluster..."); - _appLifetime.ApplicationStarted.Register(() => SafeTask.Run(RunRequestLoop, _appLifetime.ApplicationStopping)); + _appLifetime.ApplicationStarted.Register( + () => SafeTask.Run(RunRequestLoop, _appLifetime.ApplicationStopping) + ); _appLifetime.ApplicationStopping.Register(OnStopping); return Task.CompletedTask; } @@ -47,7 +53,12 @@ private async Task RunRequestLoop() for (var i = 0; i < 1000; i++) { var id = rnd.Next(0, 100000); - var t = _cluster.RequestAsync($"abc{id}", "kind", 123, new CancellationTokenSource(20000).Token); + var t = _cluster.RequestAsync( + $"abc{id}", + "kind", + 123, + new CancellationTokenSource(20000).Token + ); tasks.Add(t); } @@ -57,10 +68,12 @@ private async Task RunRequestLoop() private void OnStopping() { - var completedInTime = _cluster.ShutdownAsync() + var completedInTime = _cluster + .ShutdownAsync() .WaitUpTo(TimeSpan.FromSeconds(15)) - .GetAwaiter().GetResult(); + .GetAwaiter() + .GetResult(); if (!completedInTime) _logger.LogError("Shut down cluster timed out..."); } -} \ No newline at end of file +} diff --git a/benchmarks/HostedService/Startup.cs b/benchmarks/HostedService/Startup.cs index e006e7991e..a136b6c26d 100644 --- a/benchmarks/HostedService/Startup.cs +++ b/benchmarks/HostedService/Startup.cs @@ -26,10 +26,8 @@ public class Startup public void ConfigureServices(IServiceCollection services) { services.AddLogging(l => l.AddConsole()); - Log.SetLoggerFactory(LoggerFactory.Create(l1 => - l1.AddConsole() - .SetMinimumLevel(LogLevel.Information) - ) + Log.SetLoggerFactory( + LoggerFactory.Create(l1 => l1.AddConsole().SetMinimumLevel(LogLevel.Information)) ); var settings = MongoClientSettings.FromUrl(MongoUrl.Create("mongodb://127.0.0.1:27017")); @@ -46,15 +44,24 @@ public void ConfigureServices(IServiceCollection services) var clusterProvider = new ConsulProvider(new ConsulProviderConfig()); var identityLookup = new IdentityStorageLookup(new MongoIdentityStorage("foo", pids, 150)); - var sys = new ActorSystem(new ActorSystemConfig().WithDeadLetterThrottleCount(3).WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))) + var sys = new ActorSystem( + new ActorSystemConfig() + .WithDeadLetterThrottleCount(3) + .WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1)) + ) .WithRemote(GrpcNetRemoteConfig.BindToLocalhost(9090)) - .WithCluster(ClusterConfig.Setup("test", clusterProvider, identityLookup) - .WithClusterKind("kind", Props.FromFunc(ctx => { - if (ctx.Message is int i) ctx.Respond(i * 2); + .WithCluster( + ClusterConfig + .Setup("test", clusterProvider, identityLookup) + .WithClusterKind( + "kind", + Props.FromFunc(ctx => + { + if (ctx.Message is int i) + ctx.Respond(i * 2); return Task.CompletedTask; - } + }) ) - ) ); sys.Cluster().StartMemberAsync().Wait(); @@ -72,7 +79,9 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseDeveloperExceptionPage(); app.UseSwagger(); - app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "HostedService v1")); + app.UseSwaggerUI( + c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "HostedService v1") + ); } app.UseHttpsRedirection(); @@ -81,6 +90,9 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env) app.UseAuthorization(); - app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + }); } -} \ No newline at end of file +} diff --git a/benchmarks/InprocessBenchmark/Program.cs b/benchmarks/InprocessBenchmark/Program.cs index ffccb2c885..432951a4b8 100644 --- a/benchmarks/InprocessBenchmark/Program.cs +++ b/benchmarks/InprocessBenchmark/Program.cs @@ -22,12 +22,12 @@ private static async Task Main() const int batchSize = 100; Console.WriteLine("ClientCount\t\tDispatcher\t\tElapsed\t\tMsg/sec"); - var tps = new[] {50, 100, 200, 400, 800}; - int[] clientCounts = {4, 8, 16, 32}; + var tps = new[] { 50, 100, 200, 400, 800 }; + int[] clientCounts = { 4, 8, 16, 32 }; foreach (var t in tps) { - var d = new ThreadPoolDispatcher {Throughput = t}; + var d = new ThreadPoolDispatcher { Throughput = t }; foreach (var clientCount in clientCounts) { @@ -35,14 +35,14 @@ private static async Task Main() var pongActor = new PID[clientCount]; var completions = new TaskCompletionSource[clientCount]; - var pongProps = Props.FromProducer(() => new PongActor()) - .WithDispatcher(d); + var pongProps = Props.FromProducer(() => new PongActor()).WithDispatcher(d); for (var i = 0; i < clientCount; i++) { var tsc = new TaskCompletionSource(); completions[i] = tsc; - var pingProps = Props.FromProducer(() => new PingActor(tsc, messageCount, batchSize)) + var pingProps = Props + .FromProducer(() => new PingActor(tsc, messageCount, batchSize)) .WithDispatcher(d); pingActor[i] = context.Spawn(pingProps); @@ -65,10 +65,13 @@ private static async Task Main() sw.Stop(); var totalMessages = messageCount * 2 * clientCount; - var x = ((int) (totalMessages / (double) sw.ElapsedMilliseconds * 1000.0d)).ToString("#,##0,,M", + var x = ((int)(totalMessages / (double)sw.ElapsedMilliseconds * 1000.0d)).ToString( + "#,##0,,M", CultureInfo.InvariantCulture ); - Console.WriteLine($"{clientCount}\t\t\t{t}\t\t\t{sw.ElapsedMilliseconds} ms\t\t{x}"); + Console.WriteLine( + $"{clientCount}\t\t\t{t}\t\t\t{sw.ElapsedMilliseconds} ms\t\t{x}" + ); await Task.Delay(2000); } } @@ -130,8 +133,10 @@ public Task ReceiveAsync(IContext context) case Msg m: _messageCount--; - if (_messageCount <= 0) _wgStop.SetResult(true); - else context.Send(_targetPid, m); + if (_messageCount <= 0) + _wgStop.SetResult(true); + else + context.Send(_targetPid, m); break; } diff --git a/benchmarks/KubernetesDiagnostics/Program.cs b/benchmarks/KubernetesDiagnostics/Program.cs index cb3298171a..7096e7bce7 100644 --- a/benchmarks/KubernetesDiagnostics/Program.cs +++ b/benchmarks/KubernetesDiagnostics/Program.cs @@ -14,30 +14,34 @@ var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC"); var builder = WebApplication.CreateBuilder(args); -builder.Services.AddLogging(x => x.AddSimpleConsole(c => -{ - c.SingleLine = true; -})); +builder.Services.AddLogging( + x => + x.AddSimpleConsole(c => + { + c.SingleLine = true; + }) +); +builder.Services.AddProtoCluster( + (_, x) => + { + x.Port = 0; + x.ConfigureRemote = r => r.WithAdvertisedHost(advertisedHost); -builder.Services.AddProtoCluster((_, x) => -{ - x.Port = 0; - x.ConfigureRemote = r => - r.WithAdvertisedHost(advertisedHost); - - x.ConfigureCluster = c => c - .WithClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)) - .WithClusterKind("empty", Props.FromFunc(ctx => Task.CompletedTask)) - .WithExitOnShutdown() - .WithHeartbeatExpirationDisabled(); - - x.ClusterProvider = new KubernetesProvider(); - x.IdentityLookup = new PartitionActivatorLookup(); - -}); - -builder.Services.AddHealthChecks().AddCheck("proto", null, new[] { "ready", "live" }); + x.ConfigureCluster = c => + c.WithClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)) + .WithClusterKind("empty", Props.FromFunc(ctx => Task.CompletedTask)) + .WithExitOnShutdown() + .WithHeartbeatExpirationDisabled(); + + x.ClusterProvider = new KubernetesProvider(); + x.IdentityLookup = new PartitionActivatorLookup(); + } +); + +builder.Services + .AddHealthChecks() + .AddCheck("proto", null, new[] { "ready", "live" }); builder.Services.AddHostedService(); var app = builder.Build(); @@ -65,12 +69,11 @@ public async Task StartAsync(CancellationToken cancellationToken) _logger.LogInformation("Starting DummyHostedService"); _running = true; - _system.EventStream.Subscribe(e => { - - var hash = e.TopologyHash; - _logger.LogInformation($"{DateTime.Now:O} My members {hash}"); - } - ); + _system.EventStream.Subscribe(e => + { + var hash = e.TopologyHash; + _logger.LogInformation($"{DateTime.Now:O} My members {hash}"); + }); var props = Props.FromFunc(ctx => Task.CompletedTask); _system.Root.SpawnNamed(props, "dummy"); @@ -81,8 +84,10 @@ public async Task StartAsync(CancellationToken cancellationToken) private async Task RunLoop() { - var clusterIdentity = - ClusterIdentity.Create("some-id", new ClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)).Name); + var clusterIdentity = ClusterIdentity.Create( + "some-id", + new ClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)).Name + ); while (_running) { @@ -90,8 +95,13 @@ private async Task RunLoop() try { - var t = await _system.Cluster() - .RequestAsync(clusterIdentity, new Touch(), CancellationTokens.FromSeconds(1)); + var t = await _system + .Cluster() + .RequestAsync( + clusterIdentity, + new Touch(), + CancellationTokens.FromSeconds(1) + ); _logger.LogInformation($"called cluster actor {t.Who}"); } @@ -106,7 +116,11 @@ private async Task RunLoop() try { - var t = await _system.Root.RequestAsync(pid, new Touch(), CancellationTokens.FromSeconds(1)); + var t = await _system.Root.RequestAsync( + pid, + new Touch(), + CancellationTokens.FromSeconds(1) + ); if (t != null) { @@ -126,10 +140,9 @@ private async Task RunLoop() await Task.Delay(5000); } } - + private async Task PrintMembersLoop() { - while (_running) { var m = _system.Cluster().MemberList.GetAllMembers(); @@ -146,4 +159,4 @@ public Task StopAsync(CancellationToken cancellationToken) _running = false; return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/MailboxBenchmark/MailboxBenchmark.cs b/benchmarks/MailboxBenchmark/MailboxBenchmark.cs index ff47ce720e..26a9b40f43 100644 --- a/benchmarks/MailboxBenchmark/MailboxBenchmark.cs +++ b/benchmarks/MailboxBenchmark/MailboxBenchmark.cs @@ -21,17 +21,18 @@ private static async Task RunTest(MailboxProducer mailbox) { var context = new RootContext(new ActorSystem()); const int n = 10 * 1000; - var props = Props.FromFunc(c => { - switch (c.Message) - { - case string s: - c.Respond("done"); - break; - } - - return Task.CompletedTask; + var props = Props + .FromFunc(c => + { + switch (c.Message) + { + case string s: + c.Respond("done"); + break; } - ) + + return Task.CompletedTask; + }) .WithMailbox(mailbox); var pid = context.Spawn(props); @@ -42,4 +43,4 @@ private static async Task RunTest(MailboxProducer mailbox) await context.RequestAsync(pid, "stop"); } -} \ No newline at end of file +} diff --git a/benchmarks/MailboxBenchmark/Program.cs b/benchmarks/MailboxBenchmark/Program.cs index 4c50c0ce29..f1924fc722 100644 --- a/benchmarks/MailboxBenchmark/Program.cs +++ b/benchmarks/MailboxBenchmark/Program.cs @@ -8,4 +8,4 @@ class Program { private static void Main() => BenchmarkRunner.Run(); -} \ No newline at end of file +} diff --git a/benchmarks/PingPongBenchmark/PingActor.cs b/benchmarks/PingPongBenchmark/PingActor.cs index b59d70a30a..e7c180030b 100644 --- a/benchmarks/PingPongBenchmark/PingActor.cs +++ b/benchmarks/PingPongBenchmark/PingActor.cs @@ -47,7 +47,8 @@ public Task ReceiveAsync(IContext context) Console.Write("."); context.Send(_replyTo, true); } - else if (_messageCount > 0) context.Send(_pong, new PingMsg(context.Self)); + else if (_messageCount > 0) + context.Send(_pong, new PingMsg(context.Self)); break; } @@ -64,4 +65,4 @@ public class Start public PID Sender { get; } } -} \ No newline at end of file +} diff --git a/benchmarks/PingPongBenchmark/PongActor.cs b/benchmarks/PingPongBenchmark/PongActor.cs index a13bd9a42a..1eea4cb175 100644 --- a/benchmarks/PingPongBenchmark/PongActor.cs +++ b/benchmarks/PingPongBenchmark/PongActor.cs @@ -17,12 +17,14 @@ public Task ReceiveAsync(IContext context) case PingMsg msg: messagesLeft--; - if (messagesLeft == 0) Console.Write("#"); - else if (messagesLeft < 0) Console.Write("!"); //should not happen + if (messagesLeft == 0) + Console.Write("#"); + else if (messagesLeft < 0) + Console.Write("!"); //should not happen context.Send(msg.Sender, new PongMsg()); break; } return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/PingPongBenchmark/Program.cs b/benchmarks/PingPongBenchmark/Program.cs index 958d92847b..99af9e88f2 100644 --- a/benchmarks/PingPongBenchmark/Program.cs +++ b/benchmarks/PingPongBenchmark/Program.cs @@ -12,12 +12,12 @@ private static async Task Main() { const int messageCount = 1000000; const int batchSize = 100; - int[] clientCounts = {8, 16, 32}; - var tps = new[] {50, 100, 200, 400, 800}; + int[] clientCounts = { 8, 16, 32 }; + var tps = new[] { 50, 100, 200, 400, 800 }; foreach (var t in tps) { - var d = new ThreadPoolDispatcher {Throughput = t}; + var d = new ThreadPoolDispatcher { Throughput = t }; foreach (var clientCount in clientCounts) { @@ -29,15 +29,9 @@ private static async Task Main() for (var i = 0; i < clientCount; i++) { pingActors[i] = sys.Root.Spawn( - PingActor - .Props(messageCount, batchSize) - .WithDispatcher(d) - ); - pongActors[i] = sys.Root.Spawn( - PongActor - .Props - .WithDispatcher(d) + PingActor.Props(messageCount, batchSize).WithDispatcher(d) ); + pongActors[i] = sys.Root.Spawn(PongActor.Props.WithDispatcher(d)); } Console.WriteLine("Actors created"); @@ -50,7 +44,10 @@ private static async Task Main() var pingActor = pingActors[i]; var pongActor = pongActors[i]; - tasks[i] = sys.Root.RequestAsync(pingActor, new PingActor.Start(pongActor)); + tasks[i] = sys.Root.RequestAsync( + pingActor, + new PingActor.Start(pongActor) + ); } Console.WriteLine("Waiting for actors"); @@ -58,10 +55,10 @@ private static async Task Main() sw.Stop(); var totalMessages = messageCount * 2 * clientCount; - var x = (int) (totalMessages / (double) sw.ElapsedMilliseconds * 1000.0d); + var x = (int)(totalMessages / (double)sw.ElapsedMilliseconds * 1000.0d); Console.WriteLine(); Console.WriteLine($"{clientCount}\t\t{sw.ElapsedMilliseconds}\t\t{x:n0}"); } } } -} \ No newline at end of file +} diff --git a/benchmarks/PropsBenchmark/Program.cs b/benchmarks/PropsBenchmark/Program.cs index 00e6cddcad..74fcfe2134 100644 --- a/benchmarks/PropsBenchmark/Program.cs +++ b/benchmarks/PropsBenchmark/Program.cs @@ -6,17 +6,26 @@ using Proto.Mailbox; var system = new ActorSystem(); -var props = Props.FromFunc(ctx => Task.CompletedTask).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new UnboundedMailboxQueue())); +var props = Props + .FromFunc(ctx => Task.CompletedTask) + .WithMailbox( + () => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new UnboundedMailboxQueue()) + ); Console.WriteLine("Starting"); for (var i = 0; i < 1_000_000; i++) { var i1 = i; - var pid = system.Root.SpawnNamed(props,"x" + i, ctx => { - ctx.Set(new SomeState("SomeId" + i1)); - }); + var pid = system.Root.SpawnNamed( + props, + "x" + i, + ctx => + { + ctx.Set(new SomeState("SomeId" + i1)); + } + ); } Console.WriteLine("Done"); -Console.ReadLine(); \ No newline at end of file +Console.ReadLine(); diff --git a/benchmarks/PropsBenchmark/SomeState.cs b/benchmarks/PropsBenchmark/SomeState.cs index 0daf3f5c40..3960a00675 100644 --- a/benchmarks/PropsBenchmark/SomeState.cs +++ b/benchmarks/PropsBenchmark/SomeState.cs @@ -5,4 +5,4 @@ // ----------------------------------------------------------------------- namespace PropsBenchmark; -public record SomeState(string Name); \ No newline at end of file +public record SomeState(string Name); diff --git a/benchmarks/ProtoActorBenchmarks/LongBenchmark.cs b/benchmarks/ProtoActorBenchmarks/LongBenchmark.cs index 2c3e77ab02..3acb06769c 100644 --- a/benchmarks/ProtoActorBenchmarks/LongBenchmark.cs +++ b/benchmarks/ProtoActorBenchmarks/LongBenchmark.cs @@ -32,14 +32,15 @@ public class LongBenchmark [Benchmark] public Task InProcessPingPong() { - var d = new ThreadPoolDispatcher {Throughput = Tps}; + var d = new ThreadPoolDispatcher { Throughput = Tps }; var clientCount = Environment.ProcessorCount * 1; var clients = new PID[clientCount]; var echos = new PID[clientCount]; var completions = new TaskCompletionSource[clientCount]; - var echoProps = Props.FromProducer(() => new EchoActor()) + var echoProps = Props + .FromProducer(() => new EchoActor()) .WithDispatcher(d) .WithMailbox(() => BoundedMailbox.Create(2048)); @@ -48,7 +49,8 @@ public Task InProcessPingPong() var tsc = new TaskCompletionSource(); completions[i] = tsc; - var clientProps = Props.FromProducer(() => new PingActor(tsc, MessageCount, BatchSize)) + var clientProps = Props + .FromProducer(() => new PingActor(tsc, MessageCount, BatchSize)) .WithDispatcher(d) .WithMailbox(() => BoundedMailbox.Create(2048)); @@ -68,4 +70,4 @@ public Task InProcessPingPong() return Task.WhenAll(tasks); } -} \ No newline at end of file +} diff --git a/benchmarks/ProtoActorBenchmarks/Program.cs b/benchmarks/ProtoActorBenchmarks/Program.cs index dc3036a924..0feb224d5a 100644 --- a/benchmarks/ProtoActorBenchmarks/Program.cs +++ b/benchmarks/ProtoActorBenchmarks/Program.cs @@ -10,4 +10,4 @@ namespace ProtoActorBenchmarks; public static class Program { public static void Main() => BenchmarkRunner.Run(); -} \ No newline at end of file +} diff --git a/benchmarks/ProtoActorBenchmarks/ShortBenchmark.cs b/benchmarks/ProtoActorBenchmarks/ShortBenchmark.cs index a6eb200b05..4c4a14ba5e 100644 --- a/benchmarks/ProtoActorBenchmarks/ShortBenchmark.cs +++ b/benchmarks/ProtoActorBenchmarks/ShortBenchmark.cs @@ -24,7 +24,8 @@ public void Setup() { _context = new RootContext(new ActorSystem()); - _echoProps = Props.FromProducer(() => new EchoActor2()) + _echoProps = Props + .FromProducer(() => new EchoActor2()) .WithMailbox(() => BoundedMailbox.Create(2048)); _echoActor = _context.Spawn(_echoProps); _timeout = TimeSpan.FromSeconds(5); @@ -32,4 +33,4 @@ public void Setup() [Benchmark] public Task InProcessPingPong() => _context.RequestAsync(_echoActor, "ping", _timeout); -} \ No newline at end of file +} diff --git a/benchmarks/ProtoActorBenchmarks/SkyNetBenchmark.cs b/benchmarks/ProtoActorBenchmarks/SkyNetBenchmark.cs index 21f053b874..f0b9b19152 100644 --- a/benchmarks/ProtoActorBenchmarks/SkyNetBenchmark.cs +++ b/benchmarks/ProtoActorBenchmarks/SkyNetBenchmark.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Proto; + // ReSharper disable MethodHasAsyncOverload namespace ProtoActorBenchmarks; @@ -42,11 +43,10 @@ public void Setup() public Task SkyNetRequestAsync() { var pid = _context.Spawn(SkyNetRequestResponseActor.Props(_actorSystem)); - return _context.RequestAsync(pid, new Request - { - Num = 0, - Size = 1000000, - }, CancellationToken.None + return _context.RequestAsync( + pid, + new Request { Num = 0, Size = 1000000, }, + CancellationToken.None ); } @@ -54,11 +54,10 @@ public Task SkyNetRequestAsync() public Task SkyNetMessaging() { var pid = _context.Spawn(SkynetActor.Props(_actorSystem)); - return _context.RequestAsync(pid, new Request - { - Num = 0, - Size = 1000000, - }, CancellationToken.None + return _context.RequestAsync( + pid, + new Request { Num = 0, Size = 1000000, }, + CancellationToken.None ); } @@ -85,11 +84,9 @@ public async Task ReceiveAsync(IContext context) for (var i = 0; i < 10; i++) { var pid = _system.Root.Spawn(Props(_system)); - tasks[i] = context.RequestAsync(pid, new Request - { - Size = each, - Num = calc.Num + i * each - } + tasks[i] = context.RequestAsync( + pid, + new Request { Size = each, Num = calc.Num + i * each } ); } @@ -108,7 +105,8 @@ public async Task ReceiveAsync(IContext context) private static SkyNetRequestResponseActor ProduceActor(ActorSystem system) => new(system); - public static Props Props(ActorSystem system) => Proto.Props.FromProducer(() => ProduceActor(system)); + public static Props Props(ActorSystem system) => + Proto.Props.FromProducer(() => ProduceActor(system)); } class SkynetActor : IActor @@ -126,28 +124,28 @@ public Task ReceiveAsync(IContext context) switch (msg) { - case Request {Size: 1} r: + case Request { Size: 1 } r: context.Respond(r.Num); context.Stop(context.Self); return Task.CompletedTask; - case Request r: { + case Request r: + { _replies = 10; _replyTo = context.Sender; for (var i = 0; i < 10; i++) { var child = _system.Root.Spawn(Props(_system)); - context.Request(child, new Request - { - Num = r.Num + i * (r.Size / 10), - Size = r.Size / 10, - } + context.Request( + child, + new Request { Num = r.Num + i * (r.Size / 10), Size = r.Size / 10, } ); } return Task.CompletedTask; } - case long res: { + case long res: + { _sum += res; _replies--; @@ -166,7 +164,8 @@ public Task ReceiveAsync(IContext context) private static SkynetActor ProduceActor(ActorSystem system) => new(system); - public static Props Props(ActorSystem system) => Proto.Props.FromProducer(() => ProduceActor(system)); + public static Props Props(ActorSystem system) => + Proto.Props.FromProducer(() => ProduceActor(system)); } class Request @@ -174,4 +173,4 @@ class Request public long Num; public long Size; } -} \ No newline at end of file +} diff --git a/benchmarks/ProtoActorBenchmarks/TestActors.cs b/benchmarks/ProtoActorBenchmarks/TestActors.cs index 495d57cbd9..e7619552e2 100644 --- a/benchmarks/ProtoActorBenchmarks/TestActors.cs +++ b/benchmarks/ProtoActorBenchmarks/TestActors.cs @@ -32,9 +32,11 @@ public Task ReceiveAsync(IContext context) case Msg m: _batch--; - if (_batch > 0) break; + if (_batch > 0) + break; - if (!SendBatch(context, m.Sender)) _wgStop.SetResult(true); + if (!SendBatch(context, m.Sender)) + _wgStop.SetResult(true); break; } @@ -44,7 +46,8 @@ public Task ReceiveAsync(IContext context) private bool SendBatch(IContext context, PID sender) { - if (_messageCount == 0) return false; + if (_messageCount == 0) + return false; var m = new Msg(context.Self); @@ -101,4 +104,4 @@ public class Msg public Msg(PID sender) => Sender = sender; public PID Sender { get; } -} \ No newline at end of file +} diff --git a/benchmarks/RemoteBenchmark/Messages/Messages.cs b/benchmarks/RemoteBenchmark/Messages/Messages.cs index 40fc347bd2..ffcb093e3d 100644 --- a/benchmarks/RemoteBenchmark/Messages/Messages.cs +++ b/benchmarks/RemoteBenchmark/Messages/Messages.cs @@ -7,10 +7,6 @@ namespace Messages; -public partial class Ping : ICachedSerialization -{ -} - -public partial class Pong : ICachedSerialization -{ -} \ No newline at end of file +public partial class Ping : ICachedSerialization { } + +public partial class Pong : ICachedSerialization { } diff --git a/benchmarks/RemoteBenchmark/Node1/Program.cs b/benchmarks/RemoteBenchmark/Node1/Program.cs index 6246b0b056..804b0cbdad 100644 --- a/benchmarks/RemoteBenchmark/Node1/Program.cs +++ b/benchmarks/RemoteBenchmark/Node1/Program.cs @@ -21,11 +21,13 @@ class Program { private static async Task Main() { - Log.SetLoggerFactory(LoggerFactory.Create(c => c - .SetMinimumLevel(LogLevel.Information) - .AddFilter("Microsoft", LogLevel.None) - .AddFilter("Grpc", LogLevel.None) - .AddConsole() + Log.SetLoggerFactory( + LoggerFactory.Create( + c => + c.SetMinimumLevel(LogLevel.Information) + .AddFilter("Microsoft", LogLevel.None) + .AddFilter("Grpc", LogLevel.None) + .AddConsole() ) ); @@ -34,15 +36,12 @@ private static async Task Main() AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); #endif - - var serverRemote = 0; Console.WriteLine("Enter 0 to use Server to Server communication (Default)"); Console.WriteLine("Enter 1 to use Client to Remote communication"); if (!int.TryParse(Console.ReadLine(), out serverRemote)) serverRemote = 0; - var advertisedHost = ""; @@ -57,7 +56,8 @@ private static async Task Main() Console.WriteLine("Enter remote advertised host (Default = 127.0.0.1)"); var remoteAddress = Console.ReadLine().Trim(); - if (string.IsNullOrEmpty(remoteAddress)) remoteAddress = "127.0.0.1"; + if (string.IsNullOrEmpty(remoteAddress)) + remoteAddress = "127.0.0.1"; var actorSystemConfig = new ActorSystemConfig() .WithDeadLetterThrottleCount(10) @@ -67,88 +67,96 @@ private static async Task Main() IRemote remote; - var remoteConfig = GrpcNetRemoteConfig - .BindTo(advertisedHost) - .WithChannelOptions(new GrpcChannelOptions - { - CompressionProviders = new[] - { + .BindTo(advertisedHost) + .WithChannelOptions( + new GrpcChannelOptions + { + CompressionProviders = new[] + { new GzipCompressionProvider(CompressionLevel.Fastest) - } - } - ) - .WithEndpointWriterMaxRetries(3) - .WithProtoMessages(ProtosReflection.Descriptor); + } + } + ) + .WithEndpointWriterMaxRetries(3) + .WithProtoMessages(ProtosReflection.Descriptor); if (serverRemote == 0) remote = new GrpcNetRemote(system, remoteConfig); else remote = new GrpcNetClientRemote(system, remoteConfig); - await remote.StartAsync(); var msg = new Ping(); var messageCount = 1_000_000; var cancellationTokenSource = new CancellationTokenSource(); - _ = SafeTask.Run(async () => { - while (!cancellationTokenSource.IsCancellationRequested) + _ = SafeTask.Run( + async () => { - var semaphore = new SemaphoreSlim(0); - var props = Props.FromProducer(() => new LocalActor(0, messageCount, semaphore)); - - var pid = context.Spawn(props); - - try + while (!cancellationTokenSource.IsCancellationRequested) { - var actorPidResponse = - await remote.SpawnAsync($"{remoteAddress}:12000", "echo", TimeSpan.FromSeconds(1)); + var semaphore = new SemaphoreSlim(0); + var props = Props.FromProducer( + () => new LocalActor(0, messageCount, semaphore) + ); - if (actorPidResponse.StatusCode == (int) ResponseStatusCode.OK) + var pid = context.Spawn(props); + + try { - var remotePid = actorPidResponse.Pid; - await context.RequestAsync(remotePid, new StartRemote { Sender = pid }, + var actorPidResponse = await remote.SpawnAsync( + $"{remoteAddress}:12000", + "echo", TimeSpan.FromSeconds(1) ); - var stopWatch = new Stopwatch(); - stopWatch.Start(); - Console.WriteLine("Starting to send"); - - for (var i = 0; i < messageCount; i++) + if (actorPidResponse.StatusCode == (int)ResponseStatusCode.OK) { - context.Send(remotePid, msg); - } + var remotePid = actorPidResponse.Pid; + await context.RequestAsync( + remotePid, + new StartRemote { Sender = pid }, + TimeSpan.FromSeconds(1) + ); + var stopWatch = new Stopwatch(); + stopWatch.Start(); + Console.WriteLine("Starting to send"); + + for (var i = 0; i < messageCount; i++) + { + context.Send(remotePid, msg); + } - var linkedTokenSource = - CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, + var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( + cancellationTokenSource.Token, new CancellationTokenSource(5000).Token ); - await semaphore.WaitAsync(linkedTokenSource.Token); - stopWatch.Stop(); - var elapsed = stopWatch.Elapsed; - Console.WriteLine("Elapsed {0}", elapsed); - - var t = messageCount * 2.0 / elapsed.TotalMilliseconds * 1000; - Console.Clear(); - Console.WriteLine("Throughput {0} msg / sec", t); - await context.StopAsync(remotePid); + await semaphore.WaitAsync(linkedTokenSource.Token); + stopWatch.Stop(); + var elapsed = stopWatch.Elapsed; + Console.WriteLine("Elapsed {0}", elapsed); + + var t = messageCount * 2.0 / elapsed.TotalMilliseconds * 1000; + Console.Clear(); + Console.WriteLine("Throughput {0} msg / sec", t); + await context.StopAsync(remotePid); + } + } + catch (OperationCanceledException) + { + await Task.Delay(1000); + } + catch (Exception e) + { + logger?.LogError(e, "Error"); + await Task.Delay(5000); } - } - catch (OperationCanceledException) - { - await Task.Delay(1000); - } - catch (Exception e) - { - logger?.LogError(e, "Error"); - await Task.Delay(5000); - } - await context.PoisonAsync(pid); - } - }, cancellationTokenSource.Token + await context.PoisonAsync(pid); + } + }, + cancellationTokenSource.Token ); Console.ReadLine(); @@ -181,11 +189,12 @@ public Task ReceiveAsync(IContext context) { // Console.WriteLine(_count); } - if (_count == _messageCount) _semaphore.Release(); + if (_count == _messageCount) + _semaphore.Release(); break; } return Task.CompletedTask; } } -} \ No newline at end of file +} diff --git a/benchmarks/RemoteBenchmark/Node2/Program.cs b/benchmarks/RemoteBenchmark/Node2/Program.cs index 7ca663ca9a..a6ef9cba3f 100644 --- a/benchmarks/RemoteBenchmark/Node2/Program.cs +++ b/benchmarks/RemoteBenchmark/Node2/Program.cs @@ -21,6 +21,7 @@ public class EchoActor : IActor { private PID _sender; private static readonly Pong Pong = new Pong(); + // private int _count = 0; public Task ReceiveAsync(IContext context) @@ -49,15 +50,16 @@ class Program { private static async Task Main() { - Log.SetLoggerFactory(LoggerFactory.Create(c => c - .SetMinimumLevel(LogLevel.Information) - .AddFilter("Microsoft", LogLevel.None) - .AddFilter("Grpc", LogLevel.None) - .AddConsole() + Log.SetLoggerFactory( + LoggerFactory.Create( + c => + c.SetMinimumLevel(LogLevel.Information) + .AddFilter("Microsoft", LogLevel.None) + .AddFilter("Grpc", LogLevel.None) + .AddConsole() ) ); - Console.WriteLine("Enter Advertised Host (Default = 127.0.0.1)"); var advertisedHost = Console.ReadLine().Trim(); if (string.IsNullOrEmpty(advertisedHost)) @@ -69,10 +71,11 @@ private static async Task Main() var system = new ActorSystem(actorSystemConfig); var context = new RootContext(system); IRemote remote; - + var remoteConfig = GrpcNetRemoteConfig .BindTo(advertisedHost, 12000) - .WithChannelOptions(new GrpcChannelOptions + .WithChannelOptions( + new GrpcChannelOptions { CompressionProviders = new ICompressionProvider[] { @@ -90,4 +93,4 @@ private static async Task Main() Console.ReadLine(); await remote.ShutdownAsync(); } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/ActorSystemHostedService.cs b/benchmarks/SkyriseMini/Client/ActorSystemHostedService.cs index d3df7c4e3f..c88642b799 100644 --- a/benchmarks/SkyriseMini/Client/ActorSystemHostedService.cs +++ b/benchmarks/SkyriseMini/Client/ActorSystemHostedService.cs @@ -10,7 +10,10 @@ public class ActorSystemHostedService : IHostedService private readonly ActorSystem _actorSystem; private readonly ILogger _logger; - public ActorSystemHostedService(ActorSystem actorSystem, ILogger logger) + public ActorSystemHostedService( + ActorSystem actorSystem, + ILogger logger + ) { _actorSystem = actorSystem; _logger = logger; @@ -19,18 +22,14 @@ public ActorSystemHostedService(ActorSystem actorSystem, ILogger - lcfg - .ReadFrom.Configuration(builder.Configuration) - .WriteTo.Console() - .WriteTo.Seq(builder.Configuration["SeqUrl"]!) - .Enrich.WithProperty("Service", Assembly.GetExecutingAssembly().GetName().Name)); - + builder.Host.UseSerilog( + (_, lcfg) => + lcfg.ReadFrom + .Configuration(builder.Configuration) + .WriteTo.Console() + .WriteTo.Seq(builder.Configuration["SeqUrl"]!) + .Enrich.WithProperty("Service", Assembly.GetExecutingAssembly().GetName().Name) + ); Console.WriteLine("Starting client"); builder.Services.AddSingleton(); @@ -36,23 +37,36 @@ app.UseSwagger(); app.UseSwaggerUI(); - app.MapPost("/runMessagingTest", - (HttpContext _, IServiceProvider provider, TestManager manager, [FromQuery] int parallelism, [FromQuery] int durationInSeconds) - => { - - var __ = SafeTask.Run( () => { - var test = provider.GetRequiredService(); - manager.TrackTest(cancel => test.RunTest(parallelism, durationInSeconds, cancel)); - return Task.CompletedTask; - } - ); + app.MapPost( + "/runMessagingTest", + ( + HttpContext _, + IServiceProvider provider, + TestManager manager, + [FromQuery] int parallelism, + [FromQuery] int durationInSeconds + ) => + { + var __ = SafeTask.Run(() => + { + var test = provider.GetRequiredService(); + manager.TrackTest(cancel => test.RunTest(parallelism, durationInSeconds, cancel)); + return Task.CompletedTask; + }); return Task.CompletedTask; } ); - app.MapPost("/runActivationTest", - (HttpContext _, IServiceProvider provider, TestManager manager, [FromQuery] int activationCount, [FromQuery] int parallelism) - => { + app.MapPost( + "/runActivationTest", + ( + HttpContext _, + IServiceProvider provider, + TestManager manager, + [FromQuery] int activationCount, + [FromQuery] int parallelism + ) => + { var test = provider.GetRequiredService(); manager.TrackTest(cancel => test.RunTest(activationCount, parallelism, cancel)); @@ -70,4 +84,4 @@ finally { Log.CloseAndFlush(); -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs b/benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs index 877fe2190a..413ace3b1c 100644 --- a/benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs +++ b/benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs @@ -18,18 +18,23 @@ namespace SkyriseMini; public static class ProtoActorExtensions { - public static WebApplicationBuilder AddProtoActorTestServicesRaw(this WebApplicationBuilder builder) + public static WebApplicationBuilder AddProtoActorTestServicesRaw( + this WebApplicationBuilder builder + ) { builder.Services.AddSingleton(); - builder.Services.AddSingleton(provider => provider.GetRequiredService().Ping); - builder.Services.AddSingleton(provider => provider.GetRequiredService().Activate); + builder.Services.AddSingleton( + provider => provider.GetRequiredService().Ping + ); + builder.Services.AddSingleton( + provider => provider.GetRequiredService().Activate + ); return builder; } public static WebApplicationBuilder AddProtoActorClient(this WebApplicationBuilder builder) { - builder.Services.AddSingleton(provider => { var config = builder.Configuration.GetSection("ProtoActor"); @@ -43,7 +48,8 @@ public static WebApplicationBuilder AddProtoActorClient(this WebApplicationBuild var system = new ActorSystem(actorSystemConfig); - var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost() + var remoteConfig = GrpcNetRemoteConfig + .BindToLocalhost() .WithProtoMessages(ProtoActorSut.Contracts.ProtosReflection.Descriptor) // .WithChannelOptions(new GrpcChannelOptions // { @@ -57,8 +63,11 @@ public static WebApplicationBuilder AddProtoActorClient(this WebApplicationBuild var clusterProvider = new ConsulProvider(new ConsulProviderConfig()); - var clusterConfig = ClusterConfig - .Setup(config["ClusterName"]!, clusterProvider, new PartitionIdentityLookup()); + var clusterConfig = ClusterConfig.Setup( + config["ClusterName"]!, + clusterProvider, + new PartitionIdentityLookup() + ); system .WithServiceProvider(provider) @@ -68,9 +77,9 @@ public static WebApplicationBuilder AddProtoActorClient(this WebApplicationBuild return system; }); - + builder.Services.AddHostedService(); return builder; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/ProtoActorTestServicesRaw.cs b/benchmarks/SkyriseMini/Client/ProtoActorTestServicesRaw.cs index 63672b028d..797d0128c9 100644 --- a/benchmarks/SkyriseMini/Client/ProtoActorTestServicesRaw.cs +++ b/benchmarks/SkyriseMini/Client/ProtoActorTestServicesRaw.cs @@ -15,11 +15,19 @@ public ProtoActorTestServicesRaw(ActorSystem system) public async Task Ping(object handle, string name) { - var ci = handle as ClusterIdentity ?? - throw new ArgumentException($"Handle needs to be of type {nameof(ClusterIdentity)}", nameof(handle)); + var ci = + handle as ClusterIdentity + ?? throw new ArgumentException( + $"Handle needs to be of type {nameof(ClusterIdentity)}", + nameof(handle) + ); + + var pong = await _cluster.RequestAsync( + ci, + new PingMessage { Name = name }, + CancellationTokens.FromSeconds(5) + ); - var pong = await _cluster.RequestAsync(ci, new PingMessage { Name = name}, CancellationTokens.FromSeconds(5)); - var expectedResponse = "Hello " + name; if (pong == null) @@ -27,13 +35,19 @@ public async Task Ping(object handle, string name) throw new Exception("Request timed out"); } if (pong.Response != expectedResponse) - throw new Exception($"Received response '{pong.Response}' but expected '{expectedResponse}'"); + throw new Exception( + $"Received response '{pong.Response}' but expected '{expectedResponse}'" + ); } public async Task Activate(string id) { var ci = ClusterIdentity.Create(id, "PingPongRaw"); - var res = await _cluster.RequestAsync(ci, new PingMessage(), CancellationToken.None); + var res = await _cluster.RequestAsync( + ci, + new PingMessage(), + CancellationToken.None + ); return ci; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/Tests/ActivationTest.cs b/benchmarks/SkyriseMini/Client/Tests/ActivationTest.cs index 054d0a2a64..e80915cd95 100644 --- a/benchmarks/SkyriseMini/Client/Tests/ActivationTest.cs +++ b/benchmarks/SkyriseMini/Client/Tests/ActivationTest.cs @@ -2,7 +2,6 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging; - namespace SkyriseMini.Tests; public class ActivationTest @@ -22,16 +21,21 @@ public async Task RunTest(int activationCount, int parallelism, CancellationToke { _logger.LogInformation( "Starting activation test with activation count = {ActivationCount}, parallelism = {Parallelism}", - activationCount, parallelism); + activationCount, + parallelism + ); _logger.LogInformation("Preparing {ActivationCount} actor ids", activationCount); var actorIds = await PrepareActorIds(activationCount); - + var testDuration = await TestWorker(actorIds, parallelism, cancel); _logger.LogInformation( "Activation test completed, total activations = {TotalActivations}, duration = {TestDuration}, Throughput = {Throughput:F2} actors/s", - activationCount, testDuration, activationCount / testDuration.TotalSeconds); + activationCount, + testDuration, + activationCount / testDuration.TotalSeconds + ); } catch (Exception e) { @@ -45,40 +49,45 @@ static async Task> PrepareActorIds(int count) for (int i = 0; i < count; i++) await ch.Writer.WriteAsync(Guid.NewGuid().ToString("N")); - + ch.Writer.Complete(); return ch.Reader; } - - async Task TestWorker(ChannelReader actorIds, int parallelism, CancellationToken cancel) + + async Task TestWorker( + ChannelReader actorIds, + int parallelism, + CancellationToken cancel + ) { var overallStopwatch = new Stopwatch(); overallStopwatch.Start(); - var tasks = Enumerable.Range(1, parallelism).Select(async _ => - { - var activationStopwatch = new Stopwatch(); - - await foreach (var actorId in actorIds.ReadAllAsync(cancel)) + var tasks = Enumerable + .Range(1, parallelism) + .Select(async _ => { - try - { - activationStopwatch.Restart(); - await _activate(actorId); + var activationStopwatch = new Stopwatch(); - } - catch (Exception e) + await foreach (var actorId in actorIds.ReadAllAsync(cancel)) { - _logger.LogError(e, "Error during test"); + try + { + activationStopwatch.Restart(); + await _activate(actorId); + } + catch (Exception e) + { + _logger.LogError(e, "Error during test"); + } } - } - - activationStopwatch.Stop(); - }); + + activationStopwatch.Stop(); + }); await Task.WhenAll(tasks); overallStopwatch.Stop(); return overallStopwatch.Elapsed; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/Tests/MessagingTest.cs b/benchmarks/SkyriseMini/Client/Tests/MessagingTest.cs index 35542da455..3471bed886 100644 --- a/benchmarks/SkyriseMini/Client/Tests/MessagingTest.cs +++ b/benchmarks/SkyriseMini/Client/Tests/MessagingTest.cs @@ -1,7 +1,6 @@ using System.Diagnostics; using Microsoft.Extensions.Logging; - namespace SkyriseMini.Tests; public class MessagingTest @@ -21,8 +20,11 @@ public async Task RunTest(int parallelism, int durationInSeconds, CancellationTo { try { - _logger.LogInformation("Starting messaging test with parallelism = {Parallelism}, duration = {Duration}s", parallelism, - durationInSeconds); + _logger.LogInformation( + "Starting messaging test with parallelism = {Parallelism}, duration = {Duration}s", + parallelism, + durationInSeconds + ); var actorIds = PrepareActorIds(parallelism); @@ -35,8 +37,12 @@ public async Task RunTest(int parallelism, int durationInSeconds, CancellationTo var (totalMessages, testDuration) = await TestWorker(handles, cts.Token); - _logger.LogInformation("Messaging test completed, total messages = {TotalMessages}, duration = {TestDuration}, Throughput = {Throughput:F2} msg/s", - totalMessages, testDuration, totalMessages / testDuration.TotalSeconds); + _logger.LogInformation( + "Messaging test completed, total messages = {TotalMessages}, duration = {TestDuration}, Throughput = {Throughput:F2} msg/s", + totalMessages, + testDuration, + totalMessages / testDuration.TotalSeconds + ); } catch (Exception e) { @@ -54,16 +60,19 @@ async Task ActivateActors(string[] actorIds) return tasks.Select(t => t.Result).ToArray(); } - async Task<(long TotalMessages, TimeSpan TestDuration)> TestWorker(object[] handles, CancellationToken cancel) + async Task<(long TotalMessages, TimeSpan TestDuration)> TestWorker( + object[] handles, + CancellationToken cancel + ) { var totalMessages = 0L; var overallStopwatch = new Stopwatch(); overallStopwatch.Start(); - bool error = false; var sw = Stopwatch.StartNew(); - var tasks = handles.Select(async handle => { + var tasks = handles.Select(async handle => + { while (!cancel.IsCancellationRequested && !error) { try @@ -74,7 +83,7 @@ async Task ActivateActors(string[] actorIds) if (res % 100000 == 0) { - var tps = (int)(totalMessages / (double) sw.ElapsedMilliseconds * 1000.0); + var tps = (int)(totalMessages / (double)sw.ElapsedMilliseconds * 1000.0); Console.WriteLine(tps); } } @@ -91,4 +100,4 @@ async Task ActivateActors(string[] actorIds) overallStopwatch.Stop(); return (totalMessages, overallStopwatch.Elapsed); } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/Tests/TestManager.cs b/benchmarks/SkyriseMini/Client/Tests/TestManager.cs index 2aefd1ef97..a9855811ae 100644 --- a/benchmarks/SkyriseMini/Client/Tests/TestManager.cs +++ b/benchmarks/SkyriseMini/Client/Tests/TestManager.cs @@ -19,4 +19,4 @@ public void TrackTest(Func runTest) } private bool TestIsCurrentlyRunning() => !_currentTest.IsCompleted; -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Client/Tests/TestServices.cs b/benchmarks/SkyriseMini/Client/Tests/TestServices.cs index 002bbb6b91..c86f16fa03 100644 --- a/benchmarks/SkyriseMini/Client/Tests/TestServices.cs +++ b/benchmarks/SkyriseMini/Client/Tests/TestServices.cs @@ -2,4 +2,4 @@ public delegate Task Ping(object handle, string name); -public delegate Task Activate(string id); \ No newline at end of file +public delegate Task Activate(string id); diff --git a/benchmarks/SkyriseMini/Server/ActorSystemHostedService.cs b/benchmarks/SkyriseMini/Server/ActorSystemHostedService.cs index d3df7c4e3f..c88642b799 100644 --- a/benchmarks/SkyriseMini/Server/ActorSystemHostedService.cs +++ b/benchmarks/SkyriseMini/Server/ActorSystemHostedService.cs @@ -10,7 +10,10 @@ public class ActorSystemHostedService : IHostedService private readonly ActorSystem _actorSystem; private readonly ILogger _logger; - public ActorSystemHostedService(ActorSystem actorSystem, ILogger logger) + public ActorSystemHostedService( + ActorSystem actorSystem, + ILogger logger + ) { _actorSystem = actorSystem; _logger = logger; @@ -19,18 +22,14 @@ public ActorSystemHostedService(ActorSystem actorSystem, ILogger MessageCount = TestMeter.CreateCounter("app_message_total"); public static Counter ErrorCount = TestMeter.CreateCounter("app_errors_total"); - - public static Histogram - MessageLatency = TestMeter.CreateHistogram("app_message_latency", "seconds"); - + + public static Histogram MessageLatency = TestMeter.CreateHistogram( + "app_message_latency", + "seconds" + ); + public static MeterProviderBuilder AddTestMetrics(this MeterProviderBuilder builder) { builder .AddMeter(MeterName) - .AddView("app_message_latency", new ExplicitBucketHistogramConfiguration - { - Boundaries = new[] {0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10} - }); + .AddView( + "app_message_latency", + new ExplicitBucketHistogramConfiguration + { + Boundaries = new[] + { + 0.001, + 0.002, + 0.005, + 0.01, + 0.02, + 0.05, + 0.1, + 0.2, + 0.5, + 1, + 2, + 5, + 10 + } + } + ); return builder; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/PingPongActorRaw.cs b/benchmarks/SkyriseMini/Server/PingPongActorRaw.cs index fec25b67e0..6bd704c2f9 100644 --- a/benchmarks/SkyriseMini/Server/PingPongActorRaw.cs +++ b/benchmarks/SkyriseMini/Server/PingPongActorRaw.cs @@ -7,7 +7,7 @@ public class PingPongActorRaw : IActor { private Task Ping(PingMessage request, IContext ctx) { - ctx.Respond(new PongMessage {Response = "Hello " + request.Name}); + ctx.Respond(new PongMessage { Response = "Hello " + request.Name }); return Task.CompletedTask; } @@ -20,4 +20,4 @@ public Task ReceiveAsync(IContext context) return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/Program.cs b/benchmarks/SkyriseMini/Server/Program.cs index dd991b8247..dd1f3e76e0 100644 --- a/benchmarks/SkyriseMini/Server/Program.cs +++ b/benchmarks/SkyriseMini/Server/Program.cs @@ -8,14 +8,15 @@ try { - builder.Host.UseSerilog((_, lcfg) => - lcfg - .ReadFrom.Configuration(builder.Configuration) - .WriteTo.Console() - .WriteTo.Seq(builder.Configuration["SeqUrl"]!) - .Enrich.WithProperty("Service", Assembly.GetExecutingAssembly().GetName().Name) + builder.Host.UseSerilog( + (_, lcfg) => + lcfg.ReadFrom + .Configuration(builder.Configuration) + .WriteTo.Console() + .WriteTo.Seq(builder.Configuration["SeqUrl"]!) + .Enrich.WithProperty("Service", Assembly.GetExecutingAssembly().GetName().Name) ); - + Console.WriteLine("Starting server"); builder.AddProtoActorSUT(); var app = builder.Build(); @@ -29,4 +30,4 @@ finally { Log.CloseAndFlush(); -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs b/benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs index 248fea91e1..6b53b81a91 100644 --- a/benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs +++ b/benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs @@ -18,7 +18,6 @@ namespace SkyriseMini; public static class ProtoActorExtensions { - public static WebApplicationBuilder AddProtoActorSUT(this WebApplicationBuilder builder) { builder.Services.AddSingleton(provider => @@ -34,7 +33,8 @@ public static WebApplicationBuilder AddProtoActorSUT(this WebApplicationBuilder var system = new ActorSystem(actorSystemConfig); - var remoteConfig = GrpcNetRemoteConfig.BindToLocalhost() + var remoteConfig = GrpcNetRemoteConfig + .BindToLocalhost() .WithProtoMessages(ProtoActorSut.Contracts.ProtosReflection.Descriptor) // .WithChannelOptions(new GrpcChannelOptions // { @@ -47,11 +47,11 @@ public static WebApplicationBuilder AddProtoActorSUT(this WebApplicationBuilder .WithLogLevelForDeserializationErrors(LogLevel.Critical); var clusterProvider = new ConsulProvider(new ConsulProviderConfig()); - + var clusterConfig = ClusterConfig .Setup(config["ClusterName"]!, clusterProvider, new PartitionIdentityLookup()) - .WithClusterKind("PingPongRaw",Props.FromProducer(() => new PingPongActorRaw()) ); - + .WithClusterKind("PingPongRaw", Props.FromProducer(() => new PingPongActorRaw())); + system .WithServiceProvider(provider) .WithRemote(remoteConfig) @@ -60,11 +60,9 @@ public static WebApplicationBuilder AddProtoActorSUT(this WebApplicationBuilder return system; }); - + builder.Services.AddHostedService(); return builder; } - - -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/Tests/ActivationTest.cs b/benchmarks/SkyriseMini/Server/Tests/ActivationTest.cs index f7e1f4ba6b..3c2331f4ba 100644 --- a/benchmarks/SkyriseMini/Server/Tests/ActivationTest.cs +++ b/benchmarks/SkyriseMini/Server/Tests/ActivationTest.cs @@ -22,16 +22,21 @@ public async Task RunTest(int activationCount, int parallelism, CancellationToke { _logger.LogInformation( "Starting activation test with activation count = {ActivationCount}, parallelism = {Parallelism}", - activationCount, parallelism); + activationCount, + parallelism + ); _logger.LogInformation("Preparing {ActivationCount} actor ids", activationCount); var actorIds = await PrepareActorIds(activationCount); - + var testDuration = await TestWorker(actorIds, parallelism, cancel); _logger.LogInformation( "Activation test completed, total activations = {TotalActivations}, duration = {TestDuration}, Throughput = {Throughput:F2} actors/s", - activationCount, testDuration, activationCount / testDuration.TotalSeconds); + activationCount, + testDuration, + activationCount / testDuration.TotalSeconds + ); } catch (Exception e) { @@ -45,43 +50,51 @@ static async Task> PrepareActorIds(int count) for (int i = 0; i < count; i++) await ch.Writer.WriteAsync(Guid.NewGuid().ToString("N")); - + ch.Writer.Complete(); return ch.Reader; } - - async Task TestWorker(ChannelReader actorIds, int parallelism, CancellationToken cancel) + + async Task TestWorker( + ChannelReader actorIds, + int parallelism, + CancellationToken cancel + ) { var overallStopwatch = new Stopwatch(); overallStopwatch.Start(); - var tasks = Enumerable.Range(1, parallelism).Select(async _ => - { - var activationStopwatch = new Stopwatch(); - - await foreach (var actorId in actorIds.ReadAllAsync(cancel)) + var tasks = Enumerable + .Range(1, parallelism) + .Select(async _ => { - try - { - activationStopwatch.Restart(); - await _activate(actorId); - - TestMetrics.MessageLatency.Record(activationStopwatch.ElapsedTicks / (double)Stopwatch.Frequency); - TestMetrics.MessageCount.Add(1); - } - catch (Exception e) + var activationStopwatch = new Stopwatch(); + + await foreach (var actorId in actorIds.ReadAllAsync(cancel)) { - _logger.LogError(e, "Error during test"); - TestMetrics.ErrorCount.Add(1); + try + { + activationStopwatch.Restart(); + await _activate(actorId); + + TestMetrics.MessageLatency.Record( + activationStopwatch.ElapsedTicks / (double)Stopwatch.Frequency + ); + TestMetrics.MessageCount.Add(1); + } + catch (Exception e) + { + _logger.LogError(e, "Error during test"); + TestMetrics.ErrorCount.Add(1); + } } - } - - activationStopwatch.Stop(); - }); + + activationStopwatch.Stop(); + }); await Task.WhenAll(tasks); overallStopwatch.Stop(); return overallStopwatch.Elapsed; } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/Tests/MessagingTest.cs b/benchmarks/SkyriseMini/Server/Tests/MessagingTest.cs index 298f6e0345..84cf317f65 100644 --- a/benchmarks/SkyriseMini/Server/Tests/MessagingTest.cs +++ b/benchmarks/SkyriseMini/Server/Tests/MessagingTest.cs @@ -21,8 +21,11 @@ public async Task RunTest(int parallelism, int durationInSeconds, CancellationTo { try { - _logger.LogInformation("Starting messaging test with parallelism = {Parallelism}, duration = {Duration}s", parallelism, - durationInSeconds); + _logger.LogInformation( + "Starting messaging test with parallelism = {Parallelism}, duration = {Duration}s", + parallelism, + durationInSeconds + ); var actorIds = PrepareActorIds(parallelism); @@ -35,8 +38,12 @@ public async Task RunTest(int parallelism, int durationInSeconds, CancellationTo var (totalMessages, testDuration) = await TestWorker(handles, cts.Token); - _logger.LogInformation("Messaging test completed, total messages = {TotalMessages}, duration = {TestDuration}, Throughput = {Throughput:F2} msg/s", - totalMessages, testDuration, totalMessages / testDuration.TotalSeconds); + _logger.LogInformation( + "Messaging test completed, total messages = {TotalMessages}, duration = {TestDuration}, Throughput = {Throughput:F2} msg/s", + totalMessages, + testDuration, + totalMessages / testDuration.TotalSeconds + ); } catch (Exception e) { @@ -54,7 +61,10 @@ async Task ActivateActors(string[] actorIds) return tasks.Select(t => t.Result).ToArray(); } - async Task<(long TotalMessages, TimeSpan TestDuration)> TestWorker(object[] handles, CancellationToken cancel) + async Task<(long TotalMessages, TimeSpan TestDuration)> TestWorker( + object[] handles, + CancellationToken cancel + ) { var totalMessages = 0L; var overallStopwatch = new Stopwatch(); @@ -62,15 +72,17 @@ async Task ActivateActors(string[] actorIds) var tasks = handles.Select(async handle => { - var messageStopwatch = new Stopwatch(); + var messageStopwatch = new Stopwatch(); while (!cancel.IsCancellationRequested) { try { messageStopwatch.Restart(); await _ping(handle, Guid.NewGuid().ToString("N")); - - TestMetrics.MessageLatency.Record(messageStopwatch.ElapsedTicks / (double)Stopwatch.Frequency); + + TestMetrics.MessageLatency.Record( + messageStopwatch.ElapsedTicks / (double)Stopwatch.Frequency + ); TestMetrics.MessageCount.Add(1); Interlocked.Increment(ref totalMessages); } @@ -89,4 +101,4 @@ async Task ActivateActors(string[] actorIds) overallStopwatch.Stop(); return (totalMessages, overallStopwatch.Elapsed); } -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/Tests/TestManager.cs b/benchmarks/SkyriseMini/Server/Tests/TestManager.cs index 2aefd1ef97..a9855811ae 100644 --- a/benchmarks/SkyriseMini/Server/Tests/TestManager.cs +++ b/benchmarks/SkyriseMini/Server/Tests/TestManager.cs @@ -19,4 +19,4 @@ public void TrackTest(Func runTest) } private bool TestIsCurrentlyRunning() => !_currentTest.IsCompleted; -} \ No newline at end of file +} diff --git a/benchmarks/SkyriseMini/Server/Tests/TestServices.cs b/benchmarks/SkyriseMini/Server/Tests/TestServices.cs index 002bbb6b91..c86f16fa03 100644 --- a/benchmarks/SkyriseMini/Server/Tests/TestServices.cs +++ b/benchmarks/SkyriseMini/Server/Tests/TestServices.cs @@ -2,4 +2,4 @@ public delegate Task Ping(object handle, string name); -public delegate Task Activate(string id); \ No newline at end of file +public delegate Task Activate(string id); diff --git a/benchmarks/SpawnBenchmark/Program.cs b/benchmarks/SpawnBenchmark/Program.cs index 97431e63bf..e0221b77ab 100644 --- a/benchmarks/SpawnBenchmark/Program.cs +++ b/benchmarks/SpawnBenchmark/Program.cs @@ -34,18 +34,21 @@ public Task ReceiveAsync(IContext context) switch (msg) { - case Request {Size: 1} r: + case Request { Size: 1 } r: context.Respond(r.Num); context.Stop(context.Self); return Task.CompletedTask; - case Request r: { + case Request r: + { _replies = r.Div; _replyTo = context.Sender; for (var i = 0; i < r.Div; i++) { var child = _system.Root.Spawn(Props); - context.Request(child, new Request + context.Request( + child, + new Request { Num = r.Num + i * (r.Size / r.Div), Size = r.Size / r.Div, @@ -56,7 +59,8 @@ public Task ReceiveAsync(IContext context) return Task.CompletedTask; } - case long res: { + case long res: + { _sum += res; _replies--; @@ -72,8 +76,16 @@ public Task ReceiveAsync(IContext context) } } - public static readonly Props Props = Props.FromProducer(s => new MyActor(s)).WithMailbox(() => new DefaultMailbox(new LockingUnboundedMailboxQueue(4), new LockingUnboundedMailboxQueue(4))).WithStartDeadline(TimeSpan.Zero); - + public static readonly Props Props = Props + .FromProducer(s => new MyActor(s)) + .WithMailbox( + () => + new DefaultMailbox( + new LockingUnboundedMailboxQueue(4), + new LockingUnboundedMailboxQueue(4) + ) + ) + .WithStartDeadline(TimeSpan.Zero); } class Program @@ -89,14 +101,16 @@ private static void Main() var pid = context.Spawn(MyActor.Props); var sw = Stopwatch.StartNew(); - var t = context.RequestAsync(pid, new Request + var t = context.RequestAsync( + pid, + new Request { Num = 0, Size = 1000000, Div = 10 } ); - + var res = t.Result; Console.WriteLine(sw.Elapsed); Console.WriteLine(res); @@ -105,4 +119,4 @@ private static void Main() // Console.ReadLine(); } -} \ No newline at end of file +}