From 0bf92e618bb0a168ed103025380e062136946902 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 21 Jan 2024 15:23:49 +0100 Subject: [PATCH 1/3] remote priority --- src/Proto.Remote/Endpoints/Endpoint.cs | 43 +++++++++++++++++++--- src/Proto.Remote/IRemotePriorityMessage.cs | 6 +++ 2 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 src/Proto.Remote/IRemotePriorityMessage.cs diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index 06148d4e23..1d73fde762 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -22,8 +22,11 @@ public abstract class Endpoint : IEndpoint { private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly LogLevel _deserializationErrorLogLevel; +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete private readonly Channel _remoteDelivers = Channel.CreateUnbounded(); + private readonly Channel _remotePriorityDelivers = Channel.CreateUnbounded(); private readonly Task _sender; private readonly object _synLock = new(); private readonly Dictionary> _watchedActors = new(); @@ -112,8 +115,13 @@ public void SendMessage(PID target, object msg) } var env = new RemoteDeliver(header, message, target, sender); + var didWrite = message switch + { + IRemotePriorityMessage => _remotePriorityDelivers.Writer.TryWrite(env), + _ => _remoteDelivers.Writer.TryWrite(env) + }; - if (CancellationToken.IsCancellationRequested || !_remoteDelivers.Writer.TryWrite(env)) + if (CancellationToken.IsCancellationRequested || !didWrite) { _logger.LogWarning("[{SystemAddress}] Dropping message {MessageType} {MessagePayload} to {Target} from {Sender}", System.Address, @@ -372,17 +380,42 @@ private async Task RunAsync() try { var messages = new List(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize); - - while (await _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).ConfigureAwait(false)) + + while (true) { - while (_remoteDelivers.Reader.TryRead(out var remoteDeliver)) + var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + await Task.WhenAny(t1, t2); + + while (true) { - messages.Add(remoteDeliver); + var didWrite = false; + if (_remotePriorityDelivers.Reader.TryRead(out var remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } + + if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) + { + break; + } + + if (_remoteDelivers.Reader.TryRead(out remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) { break; } + + if (!didWrite) + { + break; + } } var batch = CreateBatch(messages); diff --git a/src/Proto.Remote/IRemotePriorityMessage.cs b/src/Proto.Remote/IRemotePriorityMessage.cs new file mode 100644 index 0000000000..236a525ece --- /dev/null +++ b/src/Proto.Remote/IRemotePriorityMessage.cs @@ -0,0 +1,6 @@ +namespace Proto.Remote; + +public interface IRemotePriorityMessage +{ + +} \ No newline at end of file From 21108128d756de16da259b14581c722522e5a152 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 21 Jan 2024 15:28:08 +0100 Subject: [PATCH 2/3] introduce cluster priority messages --- src/Proto.Cluster/Gossip/Messages.cs | 14 ++++++++++++++ src/Proto.Cluster/Messages/Messages.cs | 5 +++++ 2 files changed, 19 insertions(+) create mode 100644 src/Proto.Cluster/Gossip/Messages.cs diff --git a/src/Proto.Cluster/Gossip/Messages.cs b/src/Proto.Cluster/Gossip/Messages.cs new file mode 100644 index 0000000000..54635968d0 --- /dev/null +++ b/src/Proto.Cluster/Gossip/Messages.cs @@ -0,0 +1,14 @@ +using Proto.Remote; + +namespace Proto.Cluster.Gossip; + +public partial class GossipRequest : IRemotePriorityMessage +{ + +} + +public partial class GossipResponse : IRemotePriorityMessage +{ + +} + diff --git a/src/Proto.Cluster/Messages/Messages.cs b/src/Proto.Cluster/Messages/Messages.cs index c1b48c0a25..3e59f29bca 100644 --- a/src/Proto.Cluster/Messages/Messages.cs +++ b/src/Proto.Cluster/Messages/Messages.cs @@ -138,4 +138,9 @@ public static uint TopologyHash(IEnumerable members) return hash; } +} + +public partial class MemberHeartbeat : IRemotePriorityMessage +{ + } \ No newline at end of file From 4ff0fb1cb5acfc6d2685b398f228e10467065332 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 21 Jan 2024 16:50:05 +0100 Subject: [PATCH 3/3] add modulus --- src/Proto.Remote/Endpoints/Endpoint.cs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index 1d73fde762..cbe25f00ff 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -386,21 +386,27 @@ private async Task RunAsync() var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); await Task.WhenAny(t1, t2); - + var i = 0; while (true) { var didWrite = false; - if (_remotePriorityDelivers.Reader.TryRead(out var remoteDeliver)) + RemoteDeliver? remoteDeliver; + + //we donĀ“t need complete priority, we need "enough" important messages to get over + if (i++ % 10 == 0) { - messages.Add(remoteDeliver); - didWrite = true; + if (_remotePriorityDelivers.Reader.TryRead(out remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } + + if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) + { + break; + } } - if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) - { - break; - } - if (_remoteDelivers.Reader.TryRead(out remoteDeliver)) { messages.Add(remoteDeliver);