diff --git a/src/Proto.Cluster/Gossip/Messages.cs b/src/Proto.Cluster/Gossip/Messages.cs new file mode 100644 index 0000000000..54635968d0 --- /dev/null +++ b/src/Proto.Cluster/Gossip/Messages.cs @@ -0,0 +1,14 @@ +using Proto.Remote; + +namespace Proto.Cluster.Gossip; + +public partial class GossipRequest : IRemotePriorityMessage +{ + +} + +public partial class GossipResponse : IRemotePriorityMessage +{ + +} + diff --git a/src/Proto.Cluster/Messages/Messages.cs b/src/Proto.Cluster/Messages/Messages.cs index c1b48c0a25..3e59f29bca 100644 --- a/src/Proto.Cluster/Messages/Messages.cs +++ b/src/Proto.Cluster/Messages/Messages.cs @@ -138,4 +138,9 @@ public static uint TopologyHash(IEnumerable members) return hash; } +} + +public partial class MemberHeartbeat : IRemotePriorityMessage +{ + } \ No newline at end of file diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index 06148d4e23..cbe25f00ff 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -22,8 +22,11 @@ public abstract class Endpoint : IEndpoint { private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly LogLevel _deserializationErrorLogLevel; +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete private readonly Channel _remoteDelivers = Channel.CreateUnbounded(); + private readonly Channel _remotePriorityDelivers = Channel.CreateUnbounded(); private readonly Task _sender; private readonly object _synLock = new(); private readonly Dictionary> _watchedActors = new(); @@ -112,8 +115,13 @@ public void SendMessage(PID target, object msg) } var env = new RemoteDeliver(header, message, target, sender); + var didWrite = message switch + { + IRemotePriorityMessage => _remotePriorityDelivers.Writer.TryWrite(env), + _ => _remoteDelivers.Writer.TryWrite(env) + }; - if (CancellationToken.IsCancellationRequested || !_remoteDelivers.Writer.TryWrite(env)) + if (CancellationToken.IsCancellationRequested || !didWrite) { _logger.LogWarning("[{SystemAddress}] Dropping message {MessageType} {MessagePayload} to {Target} from {Sender}", System.Address, @@ -372,17 +380,48 @@ private async Task RunAsync() try { var messages = new List(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize); - - while (await _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).ConfigureAwait(false)) + + while (true) { - while (_remoteDelivers.Reader.TryRead(out var remoteDeliver)) + var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); + await Task.WhenAny(t1, t2); + var i = 0; + while (true) { - messages.Add(remoteDeliver); + var didWrite = false; + RemoteDeliver? remoteDeliver; + + //we donĀ“t need complete priority, we need "enough" important messages to get over + if (i++ % 10 == 0) + { + if (_remotePriorityDelivers.Reader.TryRead(out remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } + + if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) + { + break; + } + } + + if (_remoteDelivers.Reader.TryRead(out remoteDeliver)) + { + messages.Add(remoteDeliver); + didWrite = true; + } if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize) { break; } + + if (!didWrite) + { + break; + } } var batch = CreateBatch(messages); diff --git a/src/Proto.Remote/IRemotePriorityMessage.cs b/src/Proto.Remote/IRemotePriorityMessage.cs new file mode 100644 index 0000000000..236a525ece --- /dev/null +++ b/src/Proto.Remote/IRemotePriorityMessage.cs @@ -0,0 +1,6 @@ +namespace Proto.Remote; + +public interface IRemotePriorityMessage +{ + +} \ No newline at end of file