Skip to content

Commit

Permalink
remove value task (#2069)
Browse files Browse the repository at this point in the history
* remove value task
  • Loading branch information
rogeralsing authored Oct 26, 2023
1 parent 55d06bb commit 082ff80
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
66 changes: 33 additions & 33 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void EscalateFailure(Exception reason, object? message)
}
}

public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
public Task InvokeSystemMessageAsync(SystemMessage msg)
{
try
{
Expand All @@ -397,7 +397,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
Unwatch uw => HandleUnwatch(uw),
Failure f => HandleFailureAsync(f),
Restart => HandleRestartAsync(),
SuspendMailbox or ResumeMailbox => default,
SuspendMailbox or ResumeMailbox => Task.CompletedTask,
Continuation cont => HandleContinuation(cont),
ProcessDiagnosticsRequest pdr => HandleProcessDiagnosticsRequest(pdr),
ReceiveTimeout _ => HandleReceiveTimeout(),
Expand All @@ -412,7 +412,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
}
}

private ValueTask HandleStartedAsync()
private Task HandleStartedAsync()
{
if (_props.StartDeadline != TimeSpan.Zero)
{
Expand All @@ -421,7 +421,7 @@ private ValueTask HandleStartedAsync()

return InvokeUserMessageAsync(Started.Instance);

async ValueTask Await()
async Task Await()
{
var sw = Stopwatch.StartNew();
await InvokeUserMessageAsync(Started.Instance);
Expand All @@ -438,7 +438,7 @@ async ValueTask Await()
}
}

public ValueTask InvokeUserMessageAsync(object msg)
public Task InvokeUserMessageAsync(object msg)
{
if (!System.Metrics.Enabled)
{
Expand All @@ -448,7 +448,7 @@ public ValueTask InvokeUserMessageAsync(object msg)
return Await(this, msg, _metricTags);

//static, don't create a closure
static async ValueTask Await(ActorContext self, object msg, KeyValuePair<string, object?>[] metricTags)
static async Task Await(ActorContext self, object msg, KeyValuePair<string, object?>[] metricTags)
{
if (self.System.Metrics.Enabled)
{
Expand Down Expand Up @@ -482,13 +482,13 @@ public void RestartChildren(Exception reason, params PID[] pids) =>

public void ResumeChildren(params PID[] pids) => pids.SendSystemMessage(ResumeMailbox.Instance, System);

private async ValueTask HandleReceiveTimeout()
private async Task HandleReceiveTimeout()
{
_messageOrEnvelope = Proto.ReceiveTimeout.Instance;
await InvokeUserMessageAsync(Proto.ReceiveTimeout.Instance).ConfigureAwait(false);
}

private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest)
private Task HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest)
{
var diagnosticsString = "ActorType:" + Actor.GetType().Name + "\n";

Expand All @@ -504,18 +504,18 @@ private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest proc

processDiagnosticsRequest.Result.SetResult(diagnosticsString);

return default;
return Task.CompletedTask;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask InternalInvokeUserMessageAsync(object msg)
private Task InternalInvokeUserMessageAsync(object msg)
{
if (_state == ContextState.Stopped)
{
//already stopped, send message to deadletter process
System.DeadLetter.SendUserMessage(Self, msg);

return default;
return Task.CompletedTask;
}

var influenceReceiveTimeout = false;
Expand Down Expand Up @@ -555,13 +555,13 @@ private ValueTask InternalInvokeUserMessageAsync(object msg)
_extras?.ResetReceiveTimeoutTimer(ReceiveTimeout);
}

return default;
return Task.CompletedTask;
}

return Await(this, t, influenceReceiveTimeout);

//static, dont create closure
static async ValueTask Await(ActorContext self, Task t, bool resetReceiveTimeout)
static async Task Await(ActorContext self, Task t, bool resetReceiveTimeout)
{
await t.ConfigureAwait(false);

Expand Down Expand Up @@ -602,15 +602,15 @@ static async Task Continue(Task target, Continuation cont, IContext ctx)
_ = Continue(target, cont, this);
}

private static ValueTask HandleUnknownSystemMessage(object msg)
private static Task HandleUnknownSystemMessage(object msg)
{
//TODO: sounds like a pretty severe issue if we end up here? what todo?
Logger.UnknownSystemMessage(msg);

return default;
return Task.CompletedTask;
}

private async ValueTask HandleContinuation(Continuation cont)
private async Task HandleContinuation(Continuation cont)
{
// Don't execute the continuation if the actor instance changed.
// Without this, Continuation's Action closure would execute with
Expand Down Expand Up @@ -701,7 +701,7 @@ private IActor IncarnateActor()
return _props.Producer(System, this);
}

private async ValueTask HandleRestartAsync()
private async Task HandleRestartAsync()
{
//restart invoked but system is stopping. stop the actor
if (System.Shutdown.IsCancellationRequested)
Expand All @@ -721,14 +721,14 @@ private async ValueTask HandleRestartAsync()
}
}

private ValueTask HandleUnwatch(Unwatch uw)
private Task HandleUnwatch(Unwatch uw)
{
_extras?.Unwatch(uw.Watcher);

return default;
return Task.CompletedTask;
}

private ValueTask HandleWatch(Watch w)
private Task HandleWatch(Watch w)
{
if (_state >= ContextState.Stopping)
{
Expand All @@ -739,10 +739,10 @@ private ValueTask HandleWatch(Watch w)
EnsureExtras().Watch(w.Watcher);
}

return default;
return Task.CompletedTask;
}

private ValueTask HandleFailureAsync(Failure msg)
private Task HandleFailureAsync(Failure msg)
{
switch (Actor)
{
Expand All @@ -761,11 +761,11 @@ private ValueTask HandleFailureAsync(Failure msg)
break;
}

return default;
return Task.CompletedTask;
}

// this will be triggered by the actors own Termination, _and_ terminating direct children, or Watchees
private async ValueTask HandleTerminatedAsync(Terminated msg)
private async Task HandleTerminatedAsync(Terminated msg)
{
//In the case of a Watchee terminating, this will have no effect, except that the terminate message is
//passed onto the user message Receive for user level handling
Expand All @@ -785,20 +785,20 @@ private void HandleRootFailure(Failure failure) =>
);

//Initiate stopping, not final
private ValueTask HandleStopAsync()
private Task HandleStopAsync()
{
if (_state >= ContextState.Stopping)
{
//already stopping or stopped
return default;
return Task.CompletedTask;
}

_state = ContextState.Stopping;
CancelReceiveTimeout();

return Await(this);

static async ValueTask Await(ActorContext self)
static async Task Await(ActorContext self)
{
try
{
Expand All @@ -814,7 +814,7 @@ static async ValueTask Await(ActorContext self)
}
}

private ValueTask StopAllChildren()
private Task StopAllChildren()
{
if (_extras != null)
{
Expand All @@ -829,11 +829,11 @@ private ValueTask StopAllChildren()

//intermediate stopping stage, waiting for children to stop
//this is directly triggered by StopAllChildren, or by Terminated messages from stopping children
private ValueTask TryRestartOrStopAsync()
private Task TryRestartOrStopAsync()
{
if (_extras?.Children.Count > 0)
{
return default;
return Task.CompletedTask;
}

CancelReceiveTimeout();
Expand All @@ -843,12 +843,12 @@ private ValueTask TryRestartOrStopAsync()
{
ContextState.Restarting => RestartAsync(),
ContextState.Stopping => FinalizeStopAsync(),
_ => default
_ => Task.CompletedTask
};
}

//Last and final termination step
private async ValueTask FinalizeStopAsync()
private async Task FinalizeStopAsync()
{
System.ProcessRegistry.Remove(Self);
//This is intentional
Expand All @@ -867,7 +867,7 @@ private async ValueTask FinalizeStopAsync()
_state = ContextState.Stopped;
}

private async ValueTask RestartAsync()
private async Task RestartAsync()
{
await DisposeActorIfDisposable().ConfigureAwait(false);
Actor = IncarnateActor();
Expand Down
8 changes: 4 additions & 4 deletions src/Proto.Actor/Mailbox/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public interface IMessageInvoker
{
CancellationTokenSource? CancellationTokenSource { get; }

ValueTask InvokeSystemMessageAsync(SystemMessage msg);
Task InvokeSystemMessageAsync(SystemMessage msg);

ValueTask InvokeUserMessageAsync(object msg);
Task InvokeUserMessageAsync(object msg);

void EscalateFailure(Exception reason, object? message);
}
Expand Down Expand Up @@ -123,9 +123,9 @@ internal class NoopInvoker : IMessageInvoker

public CancellationTokenSource CancellationTokenSource => throw new NotImplementedException();

public ValueTask InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();
public Task InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();

public ValueTask InvokeUserMessageAsync(object msg) => throw new NotImplementedException();
public Task InvokeUserMessageAsync(object msg) => throw new NotImplementedException();

public void EscalateFailure(Exception reason, object? message) => throw new NotImplementedException();
}
15 changes: 7 additions & 8 deletions src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// -----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void PostUserMessage(object msg)
{
// if the message is a batch message, we unpack the content as individual messages in the mailbox
// feature Aka: Samkuvertering in Swedish...
if (msg is IMessageBatch || (msg is MessageEnvelope e && e.Message is IMessageBatch))
if (msg is IMessageBatch || msg is MessageEnvelope { Message: IMessageBatch })
{
var batch = (IMessageBatch)MessageEnvelope.UnwrapMessage(msg)!;
var messages = batch.GetMessages();
Expand Down Expand Up @@ -177,7 +176,7 @@ private static Task RunAsync(DefaultMailbox mailbox)

Interlocked.Exchange(ref mailbox._status, MailboxStatus.Idle);

if (mailbox._systemMessages.HasMessages || (!mailbox._suspended && mailbox._userMailbox.HasMessages))
if (mailbox._systemMessages.HasMessages || mailbox is { _suspended: false, _userMailbox.HasMessages: true })
{
mailbox.Schedule();
}
Expand All @@ -191,13 +190,13 @@ private static Task RunAsync(DefaultMailbox mailbox)

return Task.CompletedTask;

static async Task Await(DefaultMailbox self, ValueTask task)
static async Task Await(DefaultMailbox self, Task task)
{
await task.ConfigureAwait(false);

Interlocked.Exchange(ref self._status, MailboxStatus.Idle);

if (self._systemMessages.HasMessages || (!self._suspended && self._userMailbox.HasMessages))
if (self._systemMessages.HasMessages || self is { _suspended: false, _userMailbox.HasMessages: true })
{
self.Schedule();
}
Expand All @@ -211,7 +210,7 @@ static async Task Await(DefaultMailbox self, ValueTask task)
}
}

private ValueTask ProcessMessages()
private Task ProcessMessages()
{
object? msg = null;

Expand Down Expand Up @@ -278,9 +277,9 @@ private ValueTask ProcessMessages()
_invoker.EscalateFailure(e, msg);
}

return default;
return Task.CompletedTask;

static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self)
static async Task Await(object msg, Task task, DefaultMailbox self)
{
try
{
Expand Down
4 changes: 2 additions & 2 deletions tests/Proto.TestFixtures/TestMailboxHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public async void Schedule(Func<Task> runner)
}

// ReSharper disable once SuspiciousTypeConversion.Global
public async ValueTask InvokeSystemMessageAsync(SystemMessage msg) =>
public async Task InvokeSystemMessageAsync(SystemMessage msg) =>
await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task;

public async ValueTask InvokeUserMessageAsync(object msg) =>
public async Task InvokeUserMessageAsync(object msg) =>
await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task;

public void EscalateFailure(Exception reason, object message)
Expand Down

0 comments on commit 082ff80

Please sign in to comment.