diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs index fbed4fcaf4..09feaabe23 100644 --- a/src/Proto.Actor/Context/ActorContext.cs +++ b/src/Proto.Actor/Context/ActorContext.cs @@ -384,7 +384,7 @@ public void EscalateFailure(Exception reason, object? message) } } - public ValueTask InvokeSystemMessageAsync(SystemMessage msg) + public Task InvokeSystemMessageAsync(SystemMessage msg) { try { @@ -397,7 +397,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg) Unwatch uw => HandleUnwatch(uw), Failure f => HandleFailureAsync(f), Restart => HandleRestartAsync(), - SuspendMailbox or ResumeMailbox => default, + SuspendMailbox or ResumeMailbox => Task.CompletedTask, Continuation cont => HandleContinuation(cont), ProcessDiagnosticsRequest pdr => HandleProcessDiagnosticsRequest(pdr), ReceiveTimeout _ => HandleReceiveTimeout(), @@ -412,7 +412,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg) } } - private ValueTask HandleStartedAsync() + private Task HandleStartedAsync() { if (_props.StartDeadline != TimeSpan.Zero) { @@ -421,7 +421,7 @@ private ValueTask HandleStartedAsync() return InvokeUserMessageAsync(Started.Instance); - async ValueTask Await() + async Task Await() { var sw = Stopwatch.StartNew(); await InvokeUserMessageAsync(Started.Instance); @@ -438,7 +438,7 @@ async ValueTask Await() } } - public ValueTask InvokeUserMessageAsync(object msg) + public Task InvokeUserMessageAsync(object msg) { if (!System.Metrics.Enabled) { @@ -448,7 +448,7 @@ public ValueTask InvokeUserMessageAsync(object msg) return Await(this, msg, _metricTags); //static, don't create a closure - static async ValueTask Await(ActorContext self, object msg, KeyValuePair[] metricTags) + static async Task Await(ActorContext self, object msg, KeyValuePair[] metricTags) { if (self.System.Metrics.Enabled) { @@ -482,13 +482,13 @@ public void RestartChildren(Exception reason, params PID[] pids) => public void ResumeChildren(params PID[] pids) => pids.SendSystemMessage(ResumeMailbox.Instance, System); - private async ValueTask HandleReceiveTimeout() + private async Task HandleReceiveTimeout() { _messageOrEnvelope = Proto.ReceiveTimeout.Instance; await InvokeUserMessageAsync(Proto.ReceiveTimeout.Instance).ConfigureAwait(false); } - private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest) + private Task HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest) { var diagnosticsString = "ActorType:" + Actor.GetType().Name + "\n"; @@ -504,18 +504,18 @@ private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest proc processDiagnosticsRequest.Result.SetResult(diagnosticsString); - return default; + return Task.CompletedTask; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ValueTask InternalInvokeUserMessageAsync(object msg) + private Task InternalInvokeUserMessageAsync(object msg) { if (_state == ContextState.Stopped) { //already stopped, send message to deadletter process System.DeadLetter.SendUserMessage(Self, msg); - return default; + return Task.CompletedTask; } var influenceReceiveTimeout = false; @@ -555,13 +555,13 @@ private ValueTask InternalInvokeUserMessageAsync(object msg) _extras?.ResetReceiveTimeoutTimer(ReceiveTimeout); } - return default; + return Task.CompletedTask; } return Await(this, t, influenceReceiveTimeout); //static, dont create closure - static async ValueTask Await(ActorContext self, Task t, bool resetReceiveTimeout) + static async Task Await(ActorContext self, Task t, bool resetReceiveTimeout) { await t.ConfigureAwait(false); @@ -602,15 +602,15 @@ static async Task Continue(Task target, Continuation cont, IContext ctx) _ = Continue(target, cont, this); } - private static ValueTask HandleUnknownSystemMessage(object msg) + private static Task HandleUnknownSystemMessage(object msg) { //TODO: sounds like a pretty severe issue if we end up here? what todo? Logger.UnknownSystemMessage(msg); - return default; + return Task.CompletedTask; } - private async ValueTask HandleContinuation(Continuation cont) + private async Task HandleContinuation(Continuation cont) { // Don't execute the continuation if the actor instance changed. // Without this, Continuation's Action closure would execute with @@ -701,7 +701,7 @@ private IActor IncarnateActor() return _props.Producer(System, this); } - private async ValueTask HandleRestartAsync() + private async Task HandleRestartAsync() { //restart invoked but system is stopping. stop the actor if (System.Shutdown.IsCancellationRequested) @@ -721,14 +721,14 @@ private async ValueTask HandleRestartAsync() } } - private ValueTask HandleUnwatch(Unwatch uw) + private Task HandleUnwatch(Unwatch uw) { _extras?.Unwatch(uw.Watcher); - return default; + return Task.CompletedTask; } - private ValueTask HandleWatch(Watch w) + private Task HandleWatch(Watch w) { if (_state >= ContextState.Stopping) { @@ -739,10 +739,10 @@ private ValueTask HandleWatch(Watch w) EnsureExtras().Watch(w.Watcher); } - return default; + return Task.CompletedTask; } - private ValueTask HandleFailureAsync(Failure msg) + private Task HandleFailureAsync(Failure msg) { switch (Actor) { @@ -761,11 +761,11 @@ private ValueTask HandleFailureAsync(Failure msg) break; } - return default; + return Task.CompletedTask; } // this will be triggered by the actors own Termination, _and_ terminating direct children, or Watchees - private async ValueTask HandleTerminatedAsync(Terminated msg) + private async Task HandleTerminatedAsync(Terminated msg) { //In the case of a Watchee terminating, this will have no effect, except that the terminate message is //passed onto the user message Receive for user level handling @@ -785,12 +785,12 @@ private void HandleRootFailure(Failure failure) => ); //Initiate stopping, not final - private ValueTask HandleStopAsync() + private Task HandleStopAsync() { if (_state >= ContextState.Stopping) { //already stopping or stopped - return default; + return Task.CompletedTask; } _state = ContextState.Stopping; @@ -798,7 +798,7 @@ private ValueTask HandleStopAsync() return Await(this); - static async ValueTask Await(ActorContext self) + static async Task Await(ActorContext self) { try { @@ -814,7 +814,7 @@ static async ValueTask Await(ActorContext self) } } - private ValueTask StopAllChildren() + private Task StopAllChildren() { if (_extras != null) { @@ -829,11 +829,11 @@ private ValueTask StopAllChildren() //intermediate stopping stage, waiting for children to stop //this is directly triggered by StopAllChildren, or by Terminated messages from stopping children - private ValueTask TryRestartOrStopAsync() + private Task TryRestartOrStopAsync() { if (_extras?.Children.Count > 0) { - return default; + return Task.CompletedTask; } CancelReceiveTimeout(); @@ -843,12 +843,12 @@ private ValueTask TryRestartOrStopAsync() { ContextState.Restarting => RestartAsync(), ContextState.Stopping => FinalizeStopAsync(), - _ => default + _ => Task.CompletedTask }; } //Last and final termination step - private async ValueTask FinalizeStopAsync() + private async Task FinalizeStopAsync() { System.ProcessRegistry.Remove(Self); //This is intentional @@ -867,7 +867,7 @@ private async ValueTask FinalizeStopAsync() _state = ContextState.Stopped; } - private async ValueTask RestartAsync() + private async Task RestartAsync() { await DisposeActorIfDisposable().ConfigureAwait(false); Actor = IncarnateActor(); diff --git a/src/Proto.Actor/Mailbox/Dispatcher.cs b/src/Proto.Actor/Mailbox/Dispatcher.cs index 33eda792d9..3f4f671505 100644 --- a/src/Proto.Actor/Mailbox/Dispatcher.cs +++ b/src/Proto.Actor/Mailbox/Dispatcher.cs @@ -14,9 +14,9 @@ public interface IMessageInvoker { CancellationTokenSource? CancellationTokenSource { get; } - ValueTask InvokeSystemMessageAsync(SystemMessage msg); + Task InvokeSystemMessageAsync(SystemMessage msg); - ValueTask InvokeUserMessageAsync(object msg); + Task InvokeUserMessageAsync(object msg); void EscalateFailure(Exception reason, object? message); } @@ -123,9 +123,9 @@ internal class NoopInvoker : IMessageInvoker public CancellationTokenSource CancellationTokenSource => throw new NotImplementedException(); - public ValueTask InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException(); + public Task InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException(); - public ValueTask InvokeUserMessageAsync(object msg) => throw new NotImplementedException(); + public Task InvokeUserMessageAsync(object msg) => throw new NotImplementedException(); public void EscalateFailure(Exception reason, object? message) => throw new NotImplementedException(); } \ No newline at end of file diff --git a/src/Proto.Actor/Mailbox/Mailbox.cs b/src/Proto.Actor/Mailbox/Mailbox.cs index 3888bde132..21dc3c7014 100644 --- a/src/Proto.Actor/Mailbox/Mailbox.cs +++ b/src/Proto.Actor/Mailbox/Mailbox.cs @@ -5,7 +5,6 @@ // ----------------------------------------------------------------------- using System; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -93,7 +92,7 @@ public void PostUserMessage(object msg) { // if the message is a batch message, we unpack the content as individual messages in the mailbox // feature Aka: Samkuvertering in Swedish... - if (msg is IMessageBatch || (msg is MessageEnvelope e && e.Message is IMessageBatch)) + if (msg is IMessageBatch || msg is MessageEnvelope { Message: IMessageBatch }) { var batch = (IMessageBatch)MessageEnvelope.UnwrapMessage(msg)!; var messages = batch.GetMessages(); @@ -177,7 +176,7 @@ private static Task RunAsync(DefaultMailbox mailbox) Interlocked.Exchange(ref mailbox._status, MailboxStatus.Idle); - if (mailbox._systemMessages.HasMessages || (!mailbox._suspended && mailbox._userMailbox.HasMessages)) + if (mailbox._systemMessages.HasMessages || mailbox is { _suspended: false, _userMailbox.HasMessages: true }) { mailbox.Schedule(); } @@ -191,13 +190,13 @@ private static Task RunAsync(DefaultMailbox mailbox) return Task.CompletedTask; - static async Task Await(DefaultMailbox self, ValueTask task) + static async Task Await(DefaultMailbox self, Task task) { await task.ConfigureAwait(false); Interlocked.Exchange(ref self._status, MailboxStatus.Idle); - if (self._systemMessages.HasMessages || (!self._suspended && self._userMailbox.HasMessages)) + if (self._systemMessages.HasMessages || self is { _suspended: false, _userMailbox.HasMessages: true }) { self.Schedule(); } @@ -211,7 +210,7 @@ static async Task Await(DefaultMailbox self, ValueTask task) } } - private ValueTask ProcessMessages() + private Task ProcessMessages() { object? msg = null; @@ -278,9 +277,9 @@ private ValueTask ProcessMessages() _invoker.EscalateFailure(e, msg); } - return default; + return Task.CompletedTask; - static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self) + static async Task Await(object msg, Task task, DefaultMailbox self) { try { diff --git a/tests/Proto.TestFixtures/TestMailboxHandler.cs b/tests/Proto.TestFixtures/TestMailboxHandler.cs index 7e20657536..47e956e9de 100644 --- a/tests/Proto.TestFixtures/TestMailboxHandler.cs +++ b/tests/Proto.TestFixtures/TestMailboxHandler.cs @@ -32,10 +32,10 @@ public async void Schedule(Func runner) } // ReSharper disable once SuspiciousTypeConversion.Global - public async ValueTask InvokeSystemMessageAsync(SystemMessage msg) => + public async Task InvokeSystemMessageAsync(SystemMessage msg) => await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task; - public async ValueTask InvokeUserMessageAsync(object msg) => + public async Task InvokeUserMessageAsync(object msg) => await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task; public void EscalateFailure(Exception reason, object message)