From 876bb4158d27f545475487692ad8a81098cd791a Mon Sep 17 00:00:00 2001 From: Lucas Teles Date: Thu, 21 Nov 2024 16:31:54 -0300 Subject: [PATCH] Change the outbox queue directly to UDP Client (#97) --- .../PingMessageHandler.cs | 10 +- .../Backdash.Benchmarks.Ping/Program.cs | 16 +- .../Cases/UdpClientBenchmark.cs | 37 +-- .../Backdash.Benchmarks/Network/Factory.cs | 3 +- .../Backdash.Benchmarks/Network/Message.cs | 15 +- src/Backdash/Backends/BackendServices.cs | 2 +- src/Backdash/Backends/Peer2PeerBackend.cs | 3 +- src/Backdash/Backends/SpectatorBackend.cs | 3 +- src/Backdash/Network/Client/PeerClient.cs | 219 +++++++++++------- .../Network/Client/PeerClientFactory.cs | 56 +++++ src/Backdash/Network/Client/UdpSocket.cs | 15 +- src/Backdash/Network/PeerConnection.cs | 2 - src/Backdash/Network/PeerConnectionFactory.cs | 10 +- .../Network/Protocol/Comm/ProtocolOutbox.cs | 88 ++----- .../Network/Protocol/ProtocolClientFactory.cs | 14 +- .../Network/Protocol/ProtocolOptions.cs | 6 +- .../Integration/Network/UdpPeerClientTests.cs | 6 +- .../TestUtils/Network/UdpClientContext.cs | 4 +- 18 files changed, 253 insertions(+), 256 deletions(-) create mode 100644 src/Backdash/Network/Client/PeerClientFactory.cs diff --git a/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs b/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs index 7dfcc1a4..ec827e58 100644 --- a/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs +++ b/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs @@ -3,10 +3,7 @@ namespace Backdash.Benchmarks.Ping; -sealed class PingMessageHandler( - IPeerClient sender, - Memory? buffer = null -) : IPeerObserver +sealed class PingMessageHandler(IPeerClient sender) : IPeerObserver { public static long TotalProcessed => processedCount; static long processedCount; @@ -29,10 +26,7 @@ CancellationToken stoppingToken }; try { - if (buffer is null) - await sender.SendTo(from, reply, stoppingToken); - else - await sender.SendTo(from, reply, buffer.Value, stoppingToken); + await sender.SendTo(from, reply, null, stoppingToken); } catch (OperationCanceledException) { diff --git a/benchmarks/Backdash.Benchmarks.Ping/Program.cs b/benchmarks/Backdash.Benchmarks.Ping/Program.cs index c0a25cbd..136c22df 100644 --- a/benchmarks/Backdash.Benchmarks.Ping/Program.cs +++ b/benchmarks/Backdash.Benchmarks.Ping/Program.cs @@ -4,7 +4,7 @@ using Backdash.Network.Client; using Backdash.Serialization; -var totalDuration = TimeSpan.FromSeconds(10); +var totalDuration = TimeSpan.FromSeconds(20); var snapshotInterval = TimeSpan.FromSeconds(0); var printSnapshots = false; @@ -12,11 +12,9 @@ using BackgroundJobManager jobs = new(logger); const int bufferSize = Max.CompressedBytes * Max.NumberOfPlayers; -var sendBuffer1 = Mem.AllocatePinnedMemory(bufferSize); -var sendBuffer2 = Mem.AllocatePinnedMemory(bufferSize); -using var peer1 = CreateClient(9000, sendBuffer1); -using var peer2 = CreateClient(9001, sendBuffer2); +using var peer1 = CreateClient(9000); +using var peer2 = CreateClient(9001); using CancellationTokenSource cts = new(); cts.CancelAfter(totalDuration); @@ -30,7 +28,7 @@ measurer.Start(); IPEndPoint peer2Endpoint = new(IPAddress.Loopback, 9001); -_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping, sendBuffer1).AsTask(); +_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping).AsTask(); Console.WriteLine("Press enter to stop."); SpinWait.SpinUntil(() => Console.KeyAvailable || stopToken.IsCancellationRequested); @@ -44,7 +42,7 @@ Console.Clear(); Console.WriteLine(measurer.Summary(printSnapshots)); -IPeerClient CreateClient(int port, Memory? buffer = null) +IPeerClient CreateClient(int port) { PeerObserverGroup observers = new(); PeerClient peer = new( @@ -52,9 +50,11 @@ IPeerClient CreateClient(int port, Memory? buffer = null) BinarySerializerFactory.ForEnum(), observers, logger, + new Clock(), + null, bufferSize ); - observers.Add(new PingMessageHandler(peer, buffer)); + observers.Add(new PingMessageHandler(peer)); jobs.Register(peer); return peer; } diff --git a/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs b/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs index 66da1e55..81eaf61e 100644 --- a/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs +++ b/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs @@ -1,7 +1,6 @@ using System.Diagnostics; using System.Net; using Backdash.Benchmarks.Network; -using Backdash.Core; #pragma warning disable CS0649, AsyncFixer01, AsyncFixer02 // ReSharper disable AccessToDisposedClosure @@ -16,35 +15,17 @@ public class UdpClientBenchmark [Params(1000, 50_000)] public int N; - Memory pingerPinnedBuffer = Memory.Empty; - Memory pongerPinnedBuffer = Memory.Empty; - - [GlobalSetup] - public void Setup() - { - pingerPinnedBuffer = Mem.AllocatePinnedMemory(Max.UdpPacketSize); - pongerPinnedBuffer = Mem.AllocatePinnedMemory(Max.UdpPacketSize); - } - [Benchmark] - public async Task ArrayPoolBuffer() => await Start(N, null, null); + public async Task SendTest() => await Start(N); - [Benchmark] - public async Task PinnedSendBuffer() => await Start(N, pingerPinnedBuffer, pongerPinnedBuffer); - - public async Task Start( - int numberOfSpins, - Memory pingerSendBuffer, - Memory pongerSendBuffer, - TimeSpan? timeout = null - ) + public async Task Start(int numberOfSpins, TimeSpan? timeout = null) { timeout ??= TimeSpan.FromSeconds(5); using var pinger = Factory.CreateUdpClient(9000, out var pingerObservers); using var ponger = Factory.CreateUdpClient(9001, out var pongerObservers); - PingMessageHandler pingerHandler = new("Pinger", pinger, pingerSendBuffer); - PingMessageHandler pongerHandler = new("Ponger", ponger, pongerSendBuffer); + PingMessageHandler pingerHandler = new("Pinger", pinger); + PingMessageHandler pongerHandler = new("Ponger", ponger); pingerObservers.Add(pingerHandler); pongerObservers.Add(pongerHandler); @@ -63,19 +44,11 @@ void OnProcessed(long count) IPEndPoint pongerEndpoint = new(IPAddress.Loopback, 9001); var pongerAddress = pongerEndpoint.Serialize(); - async Task StartSending() - { - if (pingerSendBuffer.IsEmpty) - await pinger.SendTo(pongerAddress, PingMessage.Ping, ct); - else - await pinger.SendTo(pongerAddress, PingMessage.Ping, pingerSendBuffer, ct); - } - Task[] tasks = [ pinger.Start(ct), ponger.Start(ct), - StartSending(), + pinger.SendTo(pongerAddress, PingMessage.Ping, null, ct).AsTask(), ]; await Task.WhenAll(tasks).ConfigureAwait(false); pingerHandler.OnProcessed -= OnProcessed; diff --git a/benchmarks/Backdash.Benchmarks/Network/Factory.cs b/benchmarks/Backdash.Benchmarks/Network/Factory.cs index 947cad62..4ffd7db2 100644 --- a/benchmarks/Backdash.Benchmarks/Network/Factory.cs +++ b/benchmarks/Backdash.Benchmarks/Network/Factory.cs @@ -16,7 +16,8 @@ out PeerObserverGroup observers new UdpSocket(port), BinarySerializerFactory.ForEnum(), observers, - Logger.CreateConsoleLogger(LogLevel.None) + Logger.CreateConsoleLogger(LogLevel.None), + new Clock() ); return client; diff --git a/benchmarks/Backdash.Benchmarks/Network/Message.cs b/benchmarks/Backdash.Benchmarks/Network/Message.cs index b32fc324..bda11c52 100644 --- a/benchmarks/Backdash.Benchmarks/Network/Message.cs +++ b/benchmarks/Backdash.Benchmarks/Network/Message.cs @@ -1,5 +1,6 @@ using System.Net; using Backdash.Network.Client; + #pragma warning disable CS9113 // Parameter is unread. namespace Backdash.Benchmarks.Network; @@ -8,17 +9,15 @@ public enum PingMessage : long Ping = 111111111, Pong = 999999999, } -sealed class PingMessageHandler( - string name, - IPeerClient sender, - Memory sendBuffer -) : IPeerObserver + +sealed class PingMessageHandler(string name, IPeerClient sender) : IPeerObserver { long processedCount; long badMessages; public long ProcessedCount => processedCount; public long BadMessages => badMessages; public event Action OnProcessed = delegate { }; + public async ValueTask OnPeerMessage( PingMessage message, SocketAddress from, @@ -41,15 +40,13 @@ CancellationToken stoppingToken }; try { - if (sendBuffer.IsEmpty) - await sender.SendTo(from, reply, stoppingToken); - else - await sender.SendTo(from, reply, sendBuffer, stoppingToken); + await sender.SendTo(from, reply, null, stoppingToken); } catch (OperationCanceledException) { // skip } + OnProcessed(processedCount); #if DEBUG Console.WriteLine( diff --git a/src/Backdash/Backends/BackendServices.cs b/src/Backdash/Backends/BackendServices.cs index ab235c39..7257cd58 100644 --- a/src/Backdash/Backends/BackendServices.cs +++ b/src/Backdash/Backends/BackendServices.cs @@ -50,7 +50,7 @@ public BackendServices(RollbackOptions options, SessionServices services this, services.Clock, services.Random, - services.DelayStrategy, logger, - backgroundJobManager, udp, this.options.Protocol, this.options.TimeSync ); + backgroundJobManager.Register(udp); } diff --git a/src/Backdash/Backends/SpectatorBackend.cs b/src/Backdash/Backends/SpectatorBackend.cs index f7b165bf..41d59ced 100644 --- a/src/Backdash/Backends/SpectatorBackend.cs +++ b/src/Backdash/Backends/SpectatorBackend.cs @@ -71,8 +71,7 @@ public SpectatorBackend(int port, var magicNumber = services.Random.MagicNumber(); PeerConnectionFactory peerConnectionFactory = new( - this, clock, services.Random, services.DelayStrategy, logger, - backgroundJobManager, udp, options.Protocol, options.TimeSync + this, clock, services.Random, logger, udp, options.Protocol, options.TimeSync ); ProtocolState protocolState = diff --git a/src/Backdash/Network/Client/PeerClient.cs b/src/Backdash/Network/Client/PeerClient.cs index 9dc0e9ef..358b35f5 100644 --- a/src/Backdash/Network/Client/PeerClient.cs +++ b/src/Backdash/Network/Client/PeerClient.cs @@ -1,8 +1,7 @@ -using System.Buffers; using System.Diagnostics; using System.Net; using System.Net.Sockets; -using System.Runtime.CompilerServices; +using System.Threading.Channels; using Backdash.Core; using Backdash.Serialization; @@ -11,25 +10,43 @@ namespace Backdash.Network.Client; /// /// Client for peer communication /// -public interface IPeerClient : IDisposable where T : struct +public interface IPeerClient : IDisposable where T : struct { /// /// Send Message to peer /// - ValueTask SendTo(SocketAddress peerAddress, T payload, CancellationToken ct = default); + ValueTask SendTo(SocketAddress peerAddress, in T payload, IMessageHandler? callback = null, + CancellationToken cancellationToken = default); /// - /// Send Message to peer + /// Try To Send Message to peer /// - ValueTask SendTo(SocketAddress peerAddress, T payload, Memory buffer, CancellationToken ct = default); + bool TrySendTo(SocketAddress peerAddress, in T payload, IMessageHandler? callback = null); + /// /// Start receiving messages /// - Task StartReceiving(CancellationToken cancellationToken); + Task ProcessMessages(CancellationToken cancellationToken); +} + +/// +/// Message sent handler +/// +public interface IMessageHandler where T : struct +{ + /// + /// Handles sent message + /// + void AfterSendMessage(int bytesSent); + + /// + /// Prepare message to be sent + /// + void BeforeSendMessage(ref T message); } -interface IPeerJobClient : IBackgroundJob, IPeerClient where T : struct; +interface IPeerJobClient : IBackgroundJob, IPeerClient where T : struct; sealed class PeerClient : IPeerJobClient where T : struct { @@ -37,16 +54,32 @@ sealed class PeerClient : IPeerJobClient where T : struct readonly IPeerObserver observer; readonly IBinarySerializer serializer; readonly Logger logger; + readonly IClock clock; + readonly IDelayStrategy? delayStrategy; readonly int maxPacketSize; + readonly Channel sendQueue; CancellationTokenSource? cancellation; public string JobName { get; } + public TimeSpan NetworkLatency = TimeSpan.Zero; + + struct QueueEntry(T body, SocketAddress recipient, long queuedAt, IMessageHandler? callback) + { + public T Body = body; + public readonly SocketAddress Recipient = recipient; + public readonly long QueuedAt = queuedAt; + public readonly IMessageHandler? Callback = callback; + } + public PeerClient( IPeerSocket socket, IBinarySerializer serializer, IPeerObserver observer, Logger logger, - int maxPacketSize = Max.UdpPacketSize + IClock clock, + IDelayStrategy? delayStrategy = null, + int maxPacketSize = Max.UdpPacketSize, + int maxPackageQueue = Default.MaxPackageQueue ) { ArgumentNullException.ThrowIfNull(socket); @@ -58,35 +91,106 @@ public PeerClient( this.observer = observer; this.serializer = serializer; this.logger = logger; + this.clock = clock; + this.delayStrategy = delayStrategy; this.maxPacketSize = maxPacketSize; + sendQueue = Channel.CreateBounded( + new BoundedChannelOptions(maxPackageQueue) + { + SingleWriter = false, + SingleReader = true, + AllowSynchronousContinuations = true, + FullMode = BoundedChannelFullMode.DropOldest, + }); + JobName = $"{nameof(UdpClient)} ({socket.Port})"; } - public Task Start(CancellationToken cancellationToken) => - StartReceiving(cancellationToken); + public Task Start(CancellationToken cancellationToken) => ProcessMessages(cancellationToken); - public async Task StartReceiving(CancellationToken cancellationToken) + public async Task ProcessMessages(CancellationToken cancellationToken) { - if (cancellation is not null) return; + if (cancellation is not null) + return; + cancellation = new(); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellation.Token); var token = cts.Token; - await ReceiveLoop(token).ConfigureAwait(false); + + await Task.WhenAll(StartReceiving(token), StartSending(token)).ConfigureAwait(false); + } + + public async Task StartSending(CancellationToken cancellationToken) + { + var buffer = Mem.AllocatePinnedMemory(maxPacketSize); + var reader = sendQueue.Reader; + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + while (reader.TryRead(out var entry)) + { + if (NetworkLatency > TimeSpan.Zero && delayStrategy is not null) + { + var jitter = delayStrategy.Jitter(NetworkLatency); + SpinWait sw = new(); + while (clock.GetElapsedTime(entry.QueuedAt) <= jitter) + { + sw.SpinOnce(); + // LATER: allocations here with Task.Delay + // await Task.Delay(delayDiff, ct).ConfigureAwait(false) + } + } + + entry.Callback?.BeforeSendMessage(ref entry.Body); + + var bodySize = serializer.Serialize(in entry.Body, buffer.Span); + var sentSize = await socket.SendToAsync(buffer[..bodySize], entry.Recipient, cancellationToken) + .ConfigureAwait(false); + + Trace.Assert(sentSize == bodySize); + + entry.Callback?.AfterSendMessage(sentSize); + } + } + catch (TaskCanceledException) + { + break; + } + catch (OperationCanceledException) + { + break; + } + catch (SocketException ex) + { + if (logger.EnabledLevel is not LogLevel.None) + logger.Write(LogLevel.Error, $"Socket send error: {ex}"); + break; + } + catch (Exception ex) + { + if (logger.EnabledLevel is not LogLevel.None) + logger.Write(LogLevel.Error, $"Socket send error: {ex}"); + break; + } + } } - async Task ReceiveLoop(CancellationToken ct) + public async Task StartReceiving(CancellationToken cancellationToken) { var buffer = Mem.AllocatePinnedArray(maxPacketSize); SocketAddress address = new(socket.AddressFamily); T msg = default; - while (!ct.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { int receivedSize; try { receivedSize = await socket - .ReceiveFromAsync(buffer, address, ct) + .ReceiveFromAsync(buffer, address, cancellationToken) .ConfigureAwait(false); } catch (TaskCanceledException) @@ -100,13 +204,13 @@ async Task ReceiveLoop(CancellationToken ct) catch (SocketException ex) { if (logger.EnabledLevel is not LogLevel.None) - logger.Write(LogLevel.Error, $"Socket error: {ex}"); + logger.Write(LogLevel.Error, $"Socket rcv error: {ex}"); break; } catch (Exception ex) { if (logger.EnabledLevel is not LogLevel.None) - logger.Write(LogLevel.Error, $"Socket error: {ex}"); + logger.Write(LogLevel.Error, $"Socket rcv error: {ex}"); break; } @@ -116,7 +220,7 @@ async Task ReceiveLoop(CancellationToken ct) try { serializer.Deserialize(buffer.AsSpan(..receivedSize), ref msg); - await observer.OnPeerMessage(msg, address, receivedSize, ct).ConfigureAwait(false); + await observer.OnPeerMessage(msg, address, receivedSize, cancellationToken).ConfigureAwait(false); } catch (NetcodeDeserializationException ex) { @@ -130,82 +234,19 @@ async Task ReceiveLoop(CancellationToken ct) #pragma warning restore S1854 } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - public async ValueTask SendTo( - SocketAddress peerAddress, - T payload, - Memory buffer, - CancellationToken ct = default - ) - { - var bodySize = serializer.Serialize(in payload, buffer.Span); - var sentSize = await socket.SendToAsync(buffer[..bodySize], peerAddress, ct).ConfigureAwait(false); - Trace.Assert(sentSize == bodySize); - return sentSize; - } + public ValueTask SendTo(SocketAddress peerAddress, in T payload, + IMessageHandler? callback = null, CancellationToken cancellationToken = default) => + sendQueue.Writer.WriteAsync(new(payload, peerAddress, clock.GetTimeStamp(), callback), cancellationToken); - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - public async ValueTask SendTo( - SocketAddress peerAddress, - T payload, - CancellationToken ct = default - ) - { - var buffer = ArrayPool.Shared.Rent(maxPacketSize); - var sentBytes = await SendTo(peerAddress, payload, buffer, ct).ConfigureAwait(false); - ArrayPool.Shared.Return(buffer); - return sentBytes; - } + public bool TrySendTo(SocketAddress peerAddress, in T payload, IMessageHandler? callback = null) => + sendQueue.Writer.TryWrite(new(payload, peerAddress, clock.GetTimeStamp(), callback)); public void Dispose() { cancellation?.Cancel(); cancellation?.Dispose(); + sendQueue.Writer.TryComplete(); socket.Close(); socket.Dispose(); } } - -/// -/// Create new instances of -/// -public static class PeerClientFactory -{ - /// - /// Creates new - /// - public static IPeerClient Create( - IPeerSocket socket, - IBinarySerializer serializer, - IPeerObserver observer, - int maxPacketSize = Max.UdpPacketSize, - LogLevel logLevel = LogLevel.None, - ILogWriter? logWriter = null - ) where T : unmanaged => new PeerClient( - socket, - serializer, - observer, - Logger.CreateConsoleLogger(logLevel, logWriter), - maxPacketSize - ); - -#if !AOT_ENABLED - /// - /// Creates new - /// - public static IPeerClient Create( - IPeerSocket socket, - IPeerObserver observer, - int maxPacketSize = Max.UdpPacketSize, - LogLevel logLevel = LogLevel.None, - ILogWriter? logWriter = null - ) where T : unmanaged => Create( - socket, - BinarySerializerFactory.FindOrThrow(), - observer, - maxPacketSize, - logLevel, - logWriter - ); -#endif -} diff --git a/src/Backdash/Network/Client/PeerClientFactory.cs b/src/Backdash/Network/Client/PeerClientFactory.cs new file mode 100644 index 00000000..ecd115e8 --- /dev/null +++ b/src/Backdash/Network/Client/PeerClientFactory.cs @@ -0,0 +1,56 @@ +using Backdash.Core; +using Backdash.Serialization; + +namespace Backdash.Network.Client; + +/// +/// Create new instances of +/// +public static class PeerClientFactory +{ + /// + /// Creates new + /// + public static IPeerClient Create( + IPeerSocket socket, + IBinarySerializer serializer, + IPeerObserver observer, + int maxPacketSize = Max.UdpPacketSize, + LogLevel logLevel = LogLevel.None, + ILogWriter? logWriter = null, + DelayStrategy delayStrategy = DelayStrategy.Gaussian, + Random? random = null + ) where T : unmanaged => new PeerClient( + socket, + serializer, + observer, + Logger.CreateConsoleLogger(logLevel, logWriter), + new Clock(), + DelayStrategyFactory.Create(new DefaultRandomNumberGenerator(random ?? Random.Shared), delayStrategy), + maxPacketSize + ); + +#if !AOT_ENABLED + /// + /// Creates new + /// + public static IPeerClient Create( + IPeerSocket socket, + IPeerObserver observer, + int maxPacketSize = Max.UdpPacketSize, + LogLevel logLevel = LogLevel.None, + ILogWriter? logWriter = null, + DelayStrategy delayStrategy = DelayStrategy.Gaussian, + Random? random = null + ) where T : unmanaged => Create( + socket, + BinarySerializerFactory.FindOrThrow(), + observer, + maxPacketSize, + logLevel, + logWriter, + delayStrategy, + random + ); +#endif +} diff --git a/src/Backdash/Network/Client/UdpSocket.cs b/src/Backdash/Network/Client/UdpSocket.cs index 161b9e54..dfdfc30c 100644 --- a/src/Backdash/Network/Client/UdpSocket.cs +++ b/src/Backdash/Network/Client/UdpSocket.cs @@ -18,7 +18,6 @@ public sealed class UdpSocket : IPeerSocket readonly Socket socket; readonly IPEndPoint anyEndPoint; - readonly object locker = new(); /// /// Gets the main bind port of the Socket. @@ -121,11 +120,8 @@ public ValueTask ReceiveAsync(Memory buffer, Canc /// A cancellation token that can be used to cancel the asynchronous operation. /// An asynchronous task that completes with the number of bytes sent. public ValueTask SendToAsync(ReadOnlyMemory buffer, SocketAddress socketAddress, - CancellationToken cancellationToken) - { - lock (locker) - return socket.SendToAsync(buffer, SocketFlags.None, socketAddress, cancellationToken); - } + CancellationToken cancellationToken) => + socket.SendToAsync(buffer, SocketFlags.None, socketAddress, cancellationToken); /// /// Sends data to the specified remote host. @@ -135,11 +131,8 @@ public ValueTask SendToAsync(ReadOnlyMemory buffer, SocketAddress soc /// A cancellation token that can be used to cancel the asynchronous operation. /// An asynchronous task that completes with the number of bytes sent. public ValueTask SendToAsync(ReadOnlyMemory buffer, EndPoint remoteEndPoint, - CancellationToken cancellationToken) - { - lock (locker) - return socket.SendToAsync(buffer, SocketFlags.None, remoteEndPoint, cancellationToken); - } + CancellationToken cancellationToken) => + socket.SendToAsync(buffer, SocketFlags.None, remoteEndPoint, cancellationToken); /// public void Dispose() => socket.Dispose(); diff --git a/src/Backdash/Network/PeerConnection.cs b/src/Backdash/Network/PeerConnection.cs index ad21508d..fbe1d21a 100644 --- a/src/Backdash/Network/PeerConnection.cs +++ b/src/Backdash/Network/PeerConnection.cs @@ -84,8 +84,6 @@ public void Dispose() qualityReportTimer.Dispose(); keepAliveTimer.Dispose(); resendInputsTimer.Dispose(); - - outbox.Dispose(); } void StopTimers() diff --git a/src/Backdash/Network/PeerConnectionFactory.cs b/src/Backdash/Network/PeerConnectionFactory.cs index 0864f83e..e1596959 100644 --- a/src/Backdash/Network/PeerConnectionFactory.cs +++ b/src/Backdash/Network/PeerConnectionFactory.cs @@ -12,9 +12,7 @@ sealed class PeerConnectionFactory( IProtocolNetworkEventHandler networkEventHandler, IClock clock, IRandomNumberGenerator random, - IDelayStrategy delayStrategy, Logger logger, - IBackgroundJobManager jobManager, IPeerClient peer, ProtocolOptions options, TimeSyncOptions timeSyncOptions @@ -27,14 +25,12 @@ IProtocolInputEventPublisher inputEventQueue ) where TInput : unmanaged { var timeSync = new TimeSync(timeSyncOptions, logger); - var outbox = new ProtocolOutbox(state, options, peer, delayStrategy, clock, logger); + var outbox = new ProtocolOutbox(state, peer, clock, logger); var syncManager = new ProtocolSynchronizer(logger, clock, random, state, options, outbox, networkEventHandler); var inbox = new ProtocolInbox( options, inputSerializer, state, clock, syncManager, outbox, networkEventHandler, inputEventQueue, logger); - var inputBuffer = new ProtocolInputBuffer( - options, inputSerializer, state, logger, timeSync, outbox, inbox); - - jobManager.Register(outbox, state.StoppingToken); + var inputBuffer = + new ProtocolInputBuffer(options, inputSerializer, state, logger, timeSync, outbox, inbox); PeerConnection connection = new( options, state, logger, clock, timeSync, networkEventHandler, diff --git a/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs b/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs index 7db7bf0e..a6215a55 100644 --- a/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs +++ b/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs @@ -1,4 +1,3 @@ -using System.Threading.Channels; using Backdash.Core; using Backdash.Data; using Backdash.Network.Client; @@ -6,94 +5,35 @@ namespace Backdash.Network.Protocol.Comm; -interface IProtocolOutbox : IMessageSender, IBackgroundJob, IDisposable; +interface IProtocolOutbox : IMessageSender, IMessageHandler; sealed class ProtocolOutbox( ProtocolState state, - ProtocolOptions options, IPeerClient peer, - IDelayStrategy delayStrategy, IClock clock, Logger logger ) : IProtocolOutbox { - struct QueueEntry - { - public long QueueTime; - public ProtocolMessage Body; - } - - readonly Channel sendQueue = - Channel.CreateBounded( - new BoundedChannelOptions(options.MaxPackageQueue) - { - SingleWriter = true, - SingleReader = true, - AllowSynchronousContinuations = true, - FullMode = BoundedChannelFullMode.DropOldest, - }); - int nextSendSeq; - public string JobName { get; } = $"{nameof(ProtocolOutbox)} {state.Player}"; - QueueEntry CreateNextEntry(in ProtocolMessage msg) => - new() - { - QueueTime = clock.GetTimeStamp(), - Body = msg, - }; + public ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct) => + peer.SendTo(state.PeerAddress.Address, in msg, this, ct); - public ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct) - { - var nextEntry = CreateNextEntry(in msg); - return sendQueue.Writer.WriteAsync(nextEntry, ct); - } + public bool SendMessage(in ProtocolMessage msg) => peer.TrySendTo(state.PeerAddress.Address, in msg, this); - public bool SendMessage(in ProtocolMessage msg) + public void BeforeSendMessage(ref ProtocolMessage message) { - var nextEntry = CreateNextEntry(in msg); - return sendQueue.Writer.TryWrite(nextEntry); + message.Header.Magic = state.SyncNumber; + message.Header.SequenceNumber = (ushort)nextSendSeq; + nextSendSeq++; + + logger.Write(LogLevel.Trace, $"send {message} on {state.Player}"); } - public async Task Start(CancellationToken cancellationToken) + public void AfterSendMessage(int bytesSent) { - var sendLatency = options.NetworkLatency; - var reader = sendQueue.Reader; - var buffer = Mem.AllocatePinnedMemory(options.UdpPacketBufferSize); - while (!cancellationToken.IsCancellationRequested) - { - await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); - while (reader.TryRead(out var entry)) - { - var message = entry.Body; - message.Header.Magic = state.SyncNumber; - message.Header.SequenceNumber = (ushort)nextSendSeq; - nextSendSeq++; - - logger.Write(LogLevel.Trace, $"send {message} on {state.Player}"); - - if (sendLatency > TimeSpan.Zero) - { - var jitter = delayStrategy.Jitter(sendLatency); - SpinWait sw = new(); - while (clock.GetElapsedTime(entry.QueueTime) <= jitter) - { - sw.SpinOnce(); - // LATER: allocations here with Task.Delay - // await Task.Delay(delayDiff, ct).ConfigureAwait(false) - } - } - - var bytesSent = await peer - .SendTo(state.PeerAddress.Address, message, buffer, cancellationToken) - .ConfigureAwait(false); - - state.Stats.Send.LastTime = clock.GetTimeStamp(); - state.Stats.Send.TotalBytes += (ByteSize)bytesSent; - state.Stats.Send.TotalPackets++; - } - } + state.Stats.Send.LastTime = clock.GetTimeStamp(); + state.Stats.Send.TotalBytes += (ByteSize)bytesSent; + state.Stats.Send.TotalPackets++; } - - public void Dispose() => sendQueue.Writer.TryComplete(); } diff --git a/src/Backdash/Network/Protocol/ProtocolClientFactory.cs b/src/Backdash/Network/Protocol/ProtocolClientFactory.cs index b953866d..38f0dd5f 100644 --- a/src/Backdash/Network/Protocol/ProtocolClientFactory.cs +++ b/src/Backdash/Network/Protocol/ProtocolClientFactory.cs @@ -14,7 +14,9 @@ interface IProtocolClientFactory sealed class ProtocolClientFactory( RollbackOptions options, IPeerSocketFactory socketFactory, - Logger logger + IClock clock, + Logger logger, + IDelayStrategy delayStrategy ) : IProtocolClientFactory { public IProtocolClient CreateProtocolClient(int port, IPeerObserver observer) => @@ -23,6 +25,12 @@ public IProtocolClient CreateProtocolClient(int port, IPeerObserver - /// Distance to check out of order packets. + /// Distance to check out-of-order packets. /// /// public int MaxSequenceDistance { get; init; } = Default.MaxSeqDistance; @@ -104,7 +104,7 @@ public class ProtocolOptions public TimeSpan SyncFirstRetryInterval { get; init; } = TimeSpan.FromMilliseconds(Default.SyncFirstRetryInterval); /// - /// When the time from the last send package until now is greater then this, sends a keep alive packets. + /// When the time from the last send package until now is greater than this, sends a keep alive packets. /// /// public TimeSpan KeepAliveInterval { get; init; } = TimeSpan.FromMilliseconds(Default.KeepAliveInterval); @@ -123,7 +123,7 @@ public class ProtocolOptions public TimeSpan NetworkStatsInterval { get; init; } = TimeSpan.FromMilliseconds(Default.NetworkStatsInterval); /// - /// When the time from the last send input until now is greater then this, resends pending inputs. + /// When the time from the last send input until now is greater than this, resends pending inputs. /// /// public TimeSpan ResendInputInterval { get; init; } = TimeSpan.FromMilliseconds(Default.ResendInputInterval); diff --git a/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs b/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs index 682cb57b..593abc27 100644 --- a/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs +++ b/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs @@ -41,7 +41,7 @@ public async Task ShouldSendAndReceive() message.Value.Should().Be("hello server"); sender.Should().Be(client.Address); counter.Inc(); - await server.Client.SendTo(sender, "hello client", token); + await server.Client.SendTo(sender, "hello client", null, token); }; client.Observer.OnMessage += (message, sender, _, _) => { @@ -168,11 +168,11 @@ CancellationToken ct break; case OpMessage.IncrementCallback: Interlocked.Increment(ref totalResult); - await udpClient.SendTo(sender, OpMessage.Decrement, ct); + await udpClient.SendTo(sender, OpMessage.Decrement, null, ct); break; case OpMessage.DecrementCallback: Interlocked.Decrement(ref totalResult); - await udpClient.SendTo(sender, OpMessage.Increment, ct); + await udpClient.SendTo(sender, OpMessage.Increment, null, ct); break; default: throw new ArgumentOutOfRangeException(nameof(message), message, null); diff --git a/tests/Backdash.Tests/TestUtils/Network/UdpClientContext.cs b/tests/Backdash.Tests/TestUtils/Network/UdpClientContext.cs index 2097eb05..1c7d296f 100644 --- a/tests/Backdash.Tests/TestUtils/Network/UdpClientContext.cs +++ b/tests/Backdash.Tests/TestUtils/Network/UdpClientContext.cs @@ -27,7 +27,9 @@ public UdpClientContext(IBinarySerializer serializer, int? port = null) socket, serializer, Observer, - Logger.CreateConsoleLogger(LogLevel.None)); + Logger.CreateConsoleLogger(LogLevel.None), + new Clock() + ); } public void Dispose() => Client.Dispose();