Skip to content

Commit

Permalink
Priority remote (#2086)
Browse files Browse the repository at this point in the history
* remote priority
* introduce cluster priority messages
* add modulus
  • Loading branch information
rogeralsing authored Jan 21, 2024
1 parent 21f0253 commit 36797fa
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 5 deletions.
14 changes: 14 additions & 0 deletions src/Proto.Cluster/Gossip/Messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Proto.Remote;

namespace Proto.Cluster.Gossip;

public partial class GossipRequest : IRemotePriorityMessage
{

}

public partial class GossipResponse : IRemotePriorityMessage
{

}

5 changes: 5 additions & 0 deletions src/Proto.Cluster/Messages/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,9 @@ public static uint TopologyHash(IEnumerable<Member> members)

return hash;
}
}

public partial class MemberHeartbeat : IRemotePriorityMessage
{

}
49 changes: 44 additions & 5 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ public abstract class Endpoint : IEndpoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly LogLevel _deserializationErrorLogLevel;
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<Endpoint>();
#pragma warning restore CS0618 // Type or member is obsolete
private readonly Channel<RemoteDeliver> _remoteDelivers = Channel.CreateUnbounded<RemoteDeliver>();
private readonly Channel<RemoteDeliver> _remotePriorityDelivers = Channel.CreateUnbounded<RemoteDeliver>();
private readonly Task _sender;
private readonly object _synLock = new();
private readonly Dictionary<string, HashSet<PID>> _watchedActors = new();
Expand Down Expand Up @@ -112,8 +115,13 @@ public void SendMessage(PID target, object msg)
}

var env = new RemoteDeliver(header, message, target, sender);
var didWrite = message switch
{
IRemotePriorityMessage => _remotePriorityDelivers.Writer.TryWrite(env),
_ => _remoteDelivers.Writer.TryWrite(env)
};

if (CancellationToken.IsCancellationRequested || !_remoteDelivers.Writer.TryWrite(env))
if (CancellationToken.IsCancellationRequested || !didWrite)
{
_logger.LogWarning("[{SystemAddress}] Dropping message {MessageType} {MessagePayload} to {Target} from {Sender}",
System.Address,
Expand Down Expand Up @@ -372,17 +380,48 @@ private async Task RunAsync()
try
{
var messages = new List<RemoteDeliver>(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize);

while (await _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).ConfigureAwait(false))
while (true)
{
while (_remoteDelivers.Reader.TryRead(out var remoteDeliver))
var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
await Task.WhenAny(t1, t2);
var i = 0;
while (true)
{
messages.Add(remoteDeliver);
var didWrite = false;
RemoteDeliver? remoteDeliver;

//we don´t need complete priority, we need "enough" important messages to get over
if (i++ % 10 == 0)
{
if (_remotePriorityDelivers.Reader.TryRead(out remoteDeliver))
{
messages.Add(remoteDeliver);
didWrite = true;
}

if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize)
{
break;
}
}

if (_remoteDelivers.Reader.TryRead(out remoteDeliver))
{
messages.Add(remoteDeliver);
didWrite = true;
}

if (messages.Count >= RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize)
{
break;
}

if (!didWrite)
{
break;
}
}

var batch = CreateBatch(messages);
Expand Down
6 changes: 6 additions & 0 deletions src/Proto.Remote/IRemotePriorityMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Proto.Remote;

public interface IRemotePriorityMessage
{

}

0 comments on commit 36797fa

Please sign in to comment.