Skip to content

Commit

Permalink
Change the outbox queue directly to UDP Client (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasteles authored Nov 21, 2024
1 parent 9b689c4 commit 876bb41
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 256 deletions.
10 changes: 2 additions & 8 deletions benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

namespace Backdash.Benchmarks.Ping;

sealed class PingMessageHandler(
IPeerClient<PingMessage> sender,
Memory<byte>? buffer = null
) : IPeerObserver<PingMessage>
sealed class PingMessageHandler(IPeerClient<PingMessage> sender) : IPeerObserver<PingMessage>
{
public static long TotalProcessed => processedCount;
static long processedCount;
Expand All @@ -29,10 +26,7 @@ CancellationToken stoppingToken
};
try
{
if (buffer is null)
await sender.SendTo(from, reply, stoppingToken);
else
await sender.SendTo(from, reply, buffer.Value, stoppingToken);
await sender.SendTo(from, reply, null, stoppingToken);
}
catch (OperationCanceledException)
{
Expand Down
16 changes: 8 additions & 8 deletions benchmarks/Backdash.Benchmarks.Ping/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
using Backdash.Network.Client;
using Backdash.Serialization;

var totalDuration = TimeSpan.FromSeconds(10);
var totalDuration = TimeSpan.FromSeconds(20);
var snapshotInterval = TimeSpan.FromSeconds(0);
var printSnapshots = false;

var logger = Logger.CreateConsoleLogger(LogLevel.None);
using BackgroundJobManager jobs = new(logger);

const int bufferSize = Max.CompressedBytes * Max.NumberOfPlayers;
var sendBuffer1 = Mem.AllocatePinnedMemory(bufferSize);
var sendBuffer2 = Mem.AllocatePinnedMemory(bufferSize);

using var peer1 = CreateClient(9000, sendBuffer1);
using var peer2 = CreateClient(9001, sendBuffer2);
using var peer1 = CreateClient(9000);
using var peer2 = CreateClient(9001);

using CancellationTokenSource cts = new();
cts.CancelAfter(totalDuration);
Expand All @@ -30,7 +28,7 @@
measurer.Start();

IPEndPoint peer2Endpoint = new(IPAddress.Loopback, 9001);
_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping, sendBuffer1).AsTask();
_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping).AsTask();

Console.WriteLine("Press enter to stop.");
SpinWait.SpinUntil(() => Console.KeyAvailable || stopToken.IsCancellationRequested);
Expand All @@ -44,17 +42,19 @@
Console.Clear();
Console.WriteLine(measurer.Summary(printSnapshots));

IPeerClient<PingMessage> CreateClient(int port, Memory<byte>? buffer = null)
IPeerClient<PingMessage> CreateClient(int port)
{
PeerObserverGroup<PingMessage> observers = new();
PeerClient<PingMessage> peer = new(
new UdpSocket(port),
BinarySerializerFactory.ForEnum<PingMessage>(),
observers,
logger,
new Clock(),
null,
bufferSize
);
observers.Add(new PingMessageHandler(peer, buffer));
observers.Add(new PingMessageHandler(peer));
jobs.Register(peer);
return peer;
}
37 changes: 5 additions & 32 deletions benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Diagnostics;
using System.Net;
using Backdash.Benchmarks.Network;
using Backdash.Core;

#pragma warning disable CS0649, AsyncFixer01, AsyncFixer02
// ReSharper disable AccessToDisposedClosure
Expand All @@ -16,35 +15,17 @@ public class UdpClientBenchmark
[Params(1000, 50_000)]
public int N;

Memory<byte> pingerPinnedBuffer = Memory<byte>.Empty;
Memory<byte> pongerPinnedBuffer = Memory<byte>.Empty;

[GlobalSetup]
public void Setup()
{
pingerPinnedBuffer = Mem.AllocatePinnedMemory(Max.UdpPacketSize);
pongerPinnedBuffer = Mem.AllocatePinnedMemory(Max.UdpPacketSize);
}

[Benchmark]
public async Task ArrayPoolBuffer() => await Start(N, null, null);
public async Task SendTest() => await Start(N);

[Benchmark]
public async Task PinnedSendBuffer() => await Start(N, pingerPinnedBuffer, pongerPinnedBuffer);

public async Task Start(
int numberOfSpins,
Memory<byte> pingerSendBuffer,
Memory<byte> pongerSendBuffer,
TimeSpan? timeout = null
)
public async Task Start(int numberOfSpins, TimeSpan? timeout = null)
{
timeout ??= TimeSpan.FromSeconds(5);
using var pinger = Factory.CreateUdpClient(9000, out var pingerObservers);
using var ponger = Factory.CreateUdpClient(9001, out var pongerObservers);

PingMessageHandler pingerHandler = new("Pinger", pinger, pingerSendBuffer);
PingMessageHandler pongerHandler = new("Ponger", ponger, pongerSendBuffer);
PingMessageHandler pingerHandler = new("Pinger", pinger);
PingMessageHandler pongerHandler = new("Ponger", ponger);

pingerObservers.Add(pingerHandler);
pongerObservers.Add(pongerHandler);
Expand All @@ -63,19 +44,11 @@ void OnProcessed(long count)
IPEndPoint pongerEndpoint = new(IPAddress.Loopback, 9001);
var pongerAddress = pongerEndpoint.Serialize();

async Task StartSending()
{
if (pingerSendBuffer.IsEmpty)
await pinger.SendTo(pongerAddress, PingMessage.Ping, ct);
else
await pinger.SendTo(pongerAddress, PingMessage.Ping, pingerSendBuffer, ct);
}

Task[] tasks =
[
pinger.Start(ct),
ponger.Start(ct),
StartSending(),
pinger.SendTo(pongerAddress, PingMessage.Ping, null, ct).AsTask(),
];
await Task.WhenAll(tasks).ConfigureAwait(false);
pingerHandler.OnProcessed -= OnProcessed;
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/Backdash.Benchmarks/Network/Factory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ out PeerObserverGroup<PingMessage> observers
new UdpSocket(port),
BinarySerializerFactory.ForEnum<PingMessage>(),
observers,
Logger.CreateConsoleLogger(LogLevel.None)
Logger.CreateConsoleLogger(LogLevel.None),
new Clock()
);

return client;
Expand Down
15 changes: 6 additions & 9 deletions benchmarks/Backdash.Benchmarks/Network/Message.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Net;
using Backdash.Network.Client;

#pragma warning disable CS9113 // Parameter is unread.
namespace Backdash.Benchmarks.Network;

Expand All @@ -8,17 +9,15 @@ public enum PingMessage : long
Ping = 111111111,
Pong = 999999999,
}
sealed class PingMessageHandler(
string name,
IPeerClient<PingMessage> sender,
Memory<byte> sendBuffer
) : IPeerObserver<PingMessage>

sealed class PingMessageHandler(string name, IPeerClient<PingMessage> sender) : IPeerObserver<PingMessage>
{
long processedCount;
long badMessages;
public long ProcessedCount => processedCount;
public long BadMessages => badMessages;
public event Action<long> OnProcessed = delegate { };

public async ValueTask OnPeerMessage(
PingMessage message,
SocketAddress from,
Expand All @@ -41,15 +40,13 @@ CancellationToken stoppingToken
};
try
{
if (sendBuffer.IsEmpty)
await sender.SendTo(from, reply, stoppingToken);
else
await sender.SendTo(from, reply, sendBuffer, stoppingToken);
await sender.SendTo(from, reply, null, stoppingToken);
}
catch (OperationCanceledException)
{
// skip
}

OnProcessed(processedCount);
#if DEBUG
Console.WriteLine(
Expand Down
2 changes: 1 addition & 1 deletion src/Backdash/Backends/BackendServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public BackendServices(RollbackOptions options, SessionServices<TInput, TGameSta
JobManager = new BackgroundJobManager(Logger);

var socketFactory = services?.PeerSocketFactory ?? new PeerSocketFactory();
ProtocolClientFactory = new ProtocolClientFactory(options, socketFactory, Logger);
ProtocolClientFactory = new ProtocolClientFactory(options, socketFactory, Clock, Logger, DelayStrategy);
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/Backdash/Backends/Peer2PeerBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,12 @@ BackendServices<TInput, TGameState> services
this,
services.Clock,
services.Random,
services.DelayStrategy,
logger,
backgroundJobManager,
udp,
this.options.Protocol,
this.options.TimeSync
);

backgroundJobManager.Register(udp);
}

Expand Down
3 changes: 1 addition & 2 deletions src/Backdash/Backends/SpectatorBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public SpectatorBackend(int port,
var magicNumber = services.Random.MagicNumber();

PeerConnectionFactory peerConnectionFactory = new(
this, clock, services.Random, services.DelayStrategy, logger,
backgroundJobManager, udp, options.Protocol, options.TimeSync
this, clock, services.Random, logger, udp, options.Protocol, options.TimeSync
);

ProtocolState protocolState =
Expand Down
Loading

0 comments on commit 876bb41

Please sign in to comment.