From 0bf92e618bb0a168ed103025380e062136946902 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 21 Jan 2024 15:23:49 +0100 Subject: [PATCH] remote priority --- src/Proto.Remote/Endpoints/Endpoint.cs | 43 +++++++++++++++++++--- src/Proto.Remote/IRemotePriorityMessage.cs | 6 +++ 2 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 src/Proto.Remote/IRemotePriorityMessage.cs diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index 06148d4e23..1d73fde762 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -22,8 +22,11 @@ public abstract class Endpoint : IEndpoint { private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly LogLevel _deserializationErrorLogLevel; +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete private readonly Channel _remoteDelivers = Channel.CreateUnbounded(); + private readonly Channel _remotePriorityDelivers = Channel.CreateUnbounded(); private readonly Task _sender; private readonly object _synLock = new(); private readonly Dictionary> _watchedActors = new(); @@ -112,8 +115,13 @@ public void SendMessage(PID target, object msg) } var env = new RemoteDeliver(header, message, target, sender); + var didWrite = message switch + { + IRemotePriorityMessage => _remotePriorityDelivers.Writer.TryWrite(env), + _ => _remoteDelivers.Writer.TryWrite(env) + }; - if (CancellationToken.IsCancellationRequested || !_remoteDelivers.Writer.TryWrite(env)) + if (CancellationToken.IsCancellationRequested || !didWrite) { _logger.LogWarning("[{SystemAddress}] Dropping message {MessageType} {MessagePayload} to {Target} from {Sender}", System.Address, @@ -372,17 +380,42 @@ private async Task RunAsync() try { var messages = new List(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize); - - while (await _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).ConfigureAwait(false)) + + while (true) { - while (_remoteDelivers.Reader.TryRead(out var remoteDeliver)) + var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + await Task.WhenAny(t1, t2); + + while (true) { - messages.Add(remoteDeliver); + var didWrite = false; + if (_remotePriorityDelivers.Reader.TryRead(out var remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } + + if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) + { + break; + } + + if (_remoteDelivers.Reader.TryRead(out remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) { break; } + + if (!didWrite) + { + break; + } } var batch = CreateBatch(messages); diff --git a/src/Proto.Remote/IRemotePriorityMessage.cs b/src/Proto.Remote/IRemotePriorityMessage.cs new file mode 100644 index 0000000000..236a525ece --- /dev/null +++ b/src/Proto.Remote/IRemotePriorityMessage.cs @@ -0,0 +1,6 @@ +namespace Proto.Remote; + +public interface IRemotePriorityMessage +{ + +} \ No newline at end of file