Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove value task #2069

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void EscalateFailure(Exception reason, object? message)
}
}

public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
public Task InvokeSystemMessageAsync(SystemMessage msg)
{
try
{
Expand All @@ -397,7 +397,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
Unwatch uw => HandleUnwatch(uw),
Failure f => HandleFailureAsync(f),
Restart => HandleRestartAsync(),
SuspendMailbox or ResumeMailbox => default,
SuspendMailbox or ResumeMailbox => Task.CompletedTask,
Continuation cont => HandleContinuation(cont),
ProcessDiagnosticsRequest pdr => HandleProcessDiagnosticsRequest(pdr),
ReceiveTimeout _ => HandleReceiveTimeout(),
Expand All @@ -412,7 +412,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
}
}

private ValueTask HandleStartedAsync()
private Task HandleStartedAsync()
{
if (_props.StartDeadline != TimeSpan.Zero)
{
Expand All @@ -421,7 +421,7 @@ private ValueTask HandleStartedAsync()

return InvokeUserMessageAsync(Started.Instance);

async ValueTask Await()
async Task Await()
{
var sw = Stopwatch.StartNew();
await InvokeUserMessageAsync(Started.Instance);
Expand All @@ -438,7 +438,7 @@ async ValueTask Await()
}
}

public ValueTask InvokeUserMessageAsync(object msg)
public Task InvokeUserMessageAsync(object msg)
{
if (!System.Metrics.Enabled)
{
Expand All @@ -448,7 +448,7 @@ public ValueTask InvokeUserMessageAsync(object msg)
return Await(this, msg, _metricTags);

//static, don't create a closure
static async ValueTask Await(ActorContext self, object msg, KeyValuePair<string, object?>[] metricTags)
static async Task Await(ActorContext self, object msg, KeyValuePair<string, object?>[] metricTags)
{
if (self.System.Metrics.Enabled)
{
Expand Down Expand Up @@ -482,13 +482,13 @@ public void RestartChildren(Exception reason, params PID[] pids) =>

public void ResumeChildren(params PID[] pids) => pids.SendSystemMessage(ResumeMailbox.Instance, System);

private async ValueTask HandleReceiveTimeout()
private async Task HandleReceiveTimeout()
{
_messageOrEnvelope = Proto.ReceiveTimeout.Instance;
await InvokeUserMessageAsync(Proto.ReceiveTimeout.Instance).ConfigureAwait(false);
}

private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest)
private Task HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest processDiagnosticsRequest)
{
var diagnosticsString = "ActorType:" + Actor.GetType().Name + "\n";

Expand All @@ -504,18 +504,18 @@ private ValueTask HandleProcessDiagnosticsRequest(ProcessDiagnosticsRequest proc

processDiagnosticsRequest.Result.SetResult(diagnosticsString);

return default;
return Task.CompletedTask;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask InternalInvokeUserMessageAsync(object msg)
private Task InternalInvokeUserMessageAsync(object msg)
{
if (_state == ContextState.Stopped)
{
//already stopped, send message to deadletter process
System.DeadLetter.SendUserMessage(Self, msg);

return default;
return Task.CompletedTask;
}

var influenceReceiveTimeout = false;
Expand Down Expand Up @@ -555,13 +555,13 @@ private ValueTask InternalInvokeUserMessageAsync(object msg)
_extras?.ResetReceiveTimeoutTimer(ReceiveTimeout);
}

return default;
return Task.CompletedTask;
}

return Await(this, t, influenceReceiveTimeout);

//static, dont create closure
static async ValueTask Await(ActorContext self, Task t, bool resetReceiveTimeout)
static async Task Await(ActorContext self, Task t, bool resetReceiveTimeout)
{
await t.ConfigureAwait(false);

Expand Down Expand Up @@ -602,15 +602,15 @@ static async Task Continue(Task target, Continuation cont, IContext ctx)
_ = Continue(target, cont, this);
}

private static ValueTask HandleUnknownSystemMessage(object msg)
private static Task HandleUnknownSystemMessage(object msg)
{
//TODO: sounds like a pretty severe issue if we end up here? what todo?
Logger.UnknownSystemMessage(msg);

return default;
return Task.CompletedTask;
}

private async ValueTask HandleContinuation(Continuation cont)
private async Task HandleContinuation(Continuation cont)
{
// Don't execute the continuation if the actor instance changed.
// Without this, Continuation's Action closure would execute with
Expand Down Expand Up @@ -701,7 +701,7 @@ private IActor IncarnateActor()
return _props.Producer(System, this);
}

private async ValueTask HandleRestartAsync()
private async Task HandleRestartAsync()
{
//restart invoked but system is stopping. stop the actor
if (System.Shutdown.IsCancellationRequested)
Expand All @@ -721,14 +721,14 @@ private async ValueTask HandleRestartAsync()
}
}

private ValueTask HandleUnwatch(Unwatch uw)
private Task HandleUnwatch(Unwatch uw)
{
_extras?.Unwatch(uw.Watcher);

return default;
return Task.CompletedTask;
}

private ValueTask HandleWatch(Watch w)
private Task HandleWatch(Watch w)
{
if (_state >= ContextState.Stopping)
{
Expand All @@ -739,10 +739,10 @@ private ValueTask HandleWatch(Watch w)
EnsureExtras().Watch(w.Watcher);
}

return default;
return Task.CompletedTask;
}

private ValueTask HandleFailureAsync(Failure msg)
private Task HandleFailureAsync(Failure msg)
{
switch (Actor)
{
Expand All @@ -761,11 +761,11 @@ private ValueTask HandleFailureAsync(Failure msg)
break;
}

return default;
return Task.CompletedTask;
}

// this will be triggered by the actors own Termination, _and_ terminating direct children, or Watchees
private async ValueTask HandleTerminatedAsync(Terminated msg)
private async Task HandleTerminatedAsync(Terminated msg)
{
//In the case of a Watchee terminating, this will have no effect, except that the terminate message is
//passed onto the user message Receive for user level handling
Expand All @@ -785,20 +785,20 @@ private void HandleRootFailure(Failure failure) =>
);

//Initiate stopping, not final
private ValueTask HandleStopAsync()
private Task HandleStopAsync()
{
if (_state >= ContextState.Stopping)
{
//already stopping or stopped
return default;
return Task.CompletedTask;
}

_state = ContextState.Stopping;
CancelReceiveTimeout();

return Await(this);

static async ValueTask Await(ActorContext self)
static async Task Await(ActorContext self)
{
try
{
Expand All @@ -814,7 +814,7 @@ static async ValueTask Await(ActorContext self)
}
}

private ValueTask StopAllChildren()
private Task StopAllChildren()
{
if (_extras != null)
{
Expand All @@ -829,11 +829,11 @@ private ValueTask StopAllChildren()

//intermediate stopping stage, waiting for children to stop
//this is directly triggered by StopAllChildren, or by Terminated messages from stopping children
private ValueTask TryRestartOrStopAsync()
private Task TryRestartOrStopAsync()
{
if (_extras?.Children.Count > 0)
{
return default;
return Task.CompletedTask;
}

CancelReceiveTimeout();
Expand All @@ -843,12 +843,12 @@ private ValueTask TryRestartOrStopAsync()
{
ContextState.Restarting => RestartAsync(),
ContextState.Stopping => FinalizeStopAsync(),
_ => default
_ => Task.CompletedTask
};
}

//Last and final termination step
private async ValueTask FinalizeStopAsync()
private async Task FinalizeStopAsync()
{
System.ProcessRegistry.Remove(Self);
//This is intentional
Expand All @@ -867,7 +867,7 @@ private async ValueTask FinalizeStopAsync()
_state = ContextState.Stopped;
}

private async ValueTask RestartAsync()
private async Task RestartAsync()
{
await DisposeActorIfDisposable().ConfigureAwait(false);
Actor = IncarnateActor();
Expand Down
8 changes: 4 additions & 4 deletions src/Proto.Actor/Mailbox/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public interface IMessageInvoker
{
CancellationTokenSource? CancellationTokenSource { get; }

ValueTask InvokeSystemMessageAsync(SystemMessage msg);
Task InvokeSystemMessageAsync(SystemMessage msg);

ValueTask InvokeUserMessageAsync(object msg);
Task InvokeUserMessageAsync(object msg);

void EscalateFailure(Exception reason, object? message);
}
Expand Down Expand Up @@ -123,9 +123,9 @@ internal class NoopInvoker : IMessageInvoker

public CancellationTokenSource CancellationTokenSource => throw new NotImplementedException();

public ValueTask InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();
public Task InvokeSystemMessageAsync(SystemMessage msg) => throw new NotImplementedException();

public ValueTask InvokeUserMessageAsync(object msg) => throw new NotImplementedException();
public Task InvokeUserMessageAsync(object msg) => throw new NotImplementedException();

public void EscalateFailure(Exception reason, object? message) => throw new NotImplementedException();
}
15 changes: 7 additions & 8 deletions src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// -----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void PostUserMessage(object msg)
{
// if the message is a batch message, we unpack the content as individual messages in the mailbox
// feature Aka: Samkuvertering in Swedish...
if (msg is IMessageBatch || (msg is MessageEnvelope e && e.Message is IMessageBatch))
if (msg is IMessageBatch || msg is MessageEnvelope { Message: IMessageBatch })
{
var batch = (IMessageBatch)MessageEnvelope.UnwrapMessage(msg)!;
var messages = batch.GetMessages();
Expand Down Expand Up @@ -177,7 +176,7 @@ private static Task RunAsync(DefaultMailbox mailbox)

Interlocked.Exchange(ref mailbox._status, MailboxStatus.Idle);

if (mailbox._systemMessages.HasMessages || (!mailbox._suspended && mailbox._userMailbox.HasMessages))
if (mailbox._systemMessages.HasMessages || mailbox is { _suspended: false, _userMailbox.HasMessages: true })
{
mailbox.Schedule();
}
Expand All @@ -191,13 +190,13 @@ private static Task RunAsync(DefaultMailbox mailbox)

return Task.CompletedTask;

static async Task Await(DefaultMailbox self, ValueTask task)
static async Task Await(DefaultMailbox self, Task task)
{
await task.ConfigureAwait(false);

Interlocked.Exchange(ref self._status, MailboxStatus.Idle);

if (self._systemMessages.HasMessages || (!self._suspended && self._userMailbox.HasMessages))
if (self._systemMessages.HasMessages || self is { _suspended: false, _userMailbox.HasMessages: true })
{
self.Schedule();
}
Expand All @@ -211,7 +210,7 @@ static async Task Await(DefaultMailbox self, ValueTask task)
}
}

private ValueTask ProcessMessages()
private Task ProcessMessages()
{
object? msg = null;

Expand Down Expand Up @@ -278,9 +277,9 @@ private ValueTask ProcessMessages()
_invoker.EscalateFailure(e, msg);
}

return default;
return Task.CompletedTask;

static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self)
static async Task Await(object msg, Task task, DefaultMailbox self)
{
try
{
Expand Down
4 changes: 2 additions & 2 deletions tests/Proto.TestFixtures/TestMailboxHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public async void Schedule(Func<Task> runner)
}

// ReSharper disable once SuspiciousTypeConversion.Global
public async ValueTask InvokeSystemMessageAsync(SystemMessage msg) =>
public async Task InvokeSystemMessageAsync(SystemMessage msg) =>
await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task;

public async ValueTask InvokeUserMessageAsync(object msg) =>
public async Task InvokeUserMessageAsync(object msg) =>
await ((TestMessageWithTaskCompletionSource)msg).TaskCompletionSource.Task;

public void EscalateFailure(Exception reason, object message)
Expand Down
Loading