Skip to content

Commit

Permalink
remote priority
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Jan 21, 2024
1 parent 21f0253 commit 0bf92e6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
43 changes: 38 additions & 5 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ public abstract class Endpoint : IEndpoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly LogLevel _deserializationErrorLogLevel;
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<Endpoint>();
#pragma warning restore CS0618 // Type or member is obsolete
private readonly Channel<RemoteDeliver> _remoteDelivers = Channel.CreateUnbounded<RemoteDeliver>();
private readonly Channel<RemoteDeliver> _remotePriorityDelivers = Channel.CreateUnbounded<RemoteDeliver>();
private readonly Task _sender;
private readonly object _synLock = new();
private readonly Dictionary<string, HashSet<PID>> _watchedActors = new();
Expand Down Expand Up @@ -112,8 +115,13 @@ public void SendMessage(PID target, object msg)
}

var env = new RemoteDeliver(header, message, target, sender);
var didWrite = message switch
{
IRemotePriorityMessage => _remotePriorityDelivers.Writer.TryWrite(env),
_ => _remoteDelivers.Writer.TryWrite(env)
};

if (CancellationToken.IsCancellationRequested || !_remoteDelivers.Writer.TryWrite(env))
if (CancellationToken.IsCancellationRequested || !didWrite)
{
_logger.LogWarning("[{SystemAddress}] Dropping message {MessageType} {MessagePayload} to {Target} from {Sender}",
System.Address,
Expand Down Expand Up @@ -372,17 +380,42 @@ private async Task RunAsync()
try
{
var messages = new List<RemoteDeliver>(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize);

while (await _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).ConfigureAwait(false))
while (true)
{
while (_remoteDelivers.Reader.TryRead(out var remoteDeliver))
var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
await Task.WhenAny(t1, t2);

while (true)
{
messages.Add(remoteDeliver);
var didWrite = false;
if (_remotePriorityDelivers.Reader.TryRead(out var remoteDeliver))
{
messages.Add(remoteDeliver);
didWrite = true;
}

if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize)
{
break;
}

if (_remoteDelivers.Reader.TryRead(out remoteDeliver))
{
messages.Add(remoteDeliver);
didWrite = true;
}

if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize)
{
break;
}

if (!didWrite)
{
break;
}
}

var batch = CreateBatch(messages);
Expand Down
6 changes: 6 additions & 0 deletions src/Proto.Remote/IRemotePriorityMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Proto.Remote;

public interface IRemotePriorityMessage
{

}

0 comments on commit 0bf92e6

Please sign in to comment.