Skip to content

Commit

Permalink
synchrous message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasteles committed Nov 22, 2024
1 parent 80e9025 commit 838f0fa
Show file tree
Hide file tree
Showing 18 changed files with 99 additions and 113 deletions.
21 changes: 7 additions & 14 deletions benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Net;
using Backdash.Network.Client;

Expand All @@ -8,29 +9,21 @@ sealed class PingMessageHandler(IPeerClient<PingMessage> sender) : IPeerObserver
public static long TotalProcessed => processedCount;
static long processedCount;

public async ValueTask OnPeerMessage(
PingMessage message,
public void OnPeerMessage(
in PingMessage message,
SocketAddress from,
int bytesReceived,
CancellationToken stoppingToken
int bytesReceived
)
{
if (stoppingToken.IsCancellationRequested)
return;
Interlocked.Increment(ref processedCount);

var reply = message switch
{
PingMessage.Ping => PingMessage.Pong,
PingMessage.Pong => PingMessage.Ping,
_ => throw new ArgumentOutOfRangeException(nameof(message), message, null),
};
try
{
await sender.SendTo(from, reply, null, stoppingToken);
}
catch (OperationCanceledException)
{
// skip
}

Trace.Assert(sender.TrySendTo(from, reply));
}
}
5 changes: 3 additions & 2 deletions benchmarks/Backdash.Benchmarks.Ping/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Backdash.Network.Client;
using Backdash.Serialization;

var totalDuration = TimeSpan.FromSeconds(20);
var totalDuration = TimeSpan.FromSeconds(10);
var snapshotInterval = TimeSpan.FromSeconds(0);
var printSnapshots = false;

Expand All @@ -28,7 +28,7 @@
measurer.Start();

IPEndPoint peer2Endpoint = new(IPAddress.Loopback, 9001);
_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping).AsTask();
peer1.TrySendTo(peer2Endpoint.Serialize(), PingMessage.Ping);

Console.WriteLine("Press enter to stop.");
SpinWait.SpinUntil(() => Console.KeyAvailable || stopToken.IsCancellationRequested);
Expand All @@ -42,6 +42,7 @@
Console.Clear();
Console.WriteLine(measurer.Summary(printSnapshots));


IPeerClient<PingMessage> CreateClient(int port)
{
PeerObserverGroup<PingMessage> observers = new();
Expand Down
12 changes: 6 additions & 6 deletions benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ void OnProcessed(long count)
IPEndPoint pongerEndpoint = new(IPAddress.Loopback, 9001);
var pongerAddress = pongerEndpoint.Serialize();

Task[] tasks =
[
Trace.Assert(pinger.TrySendTo(pongerAddress, PingMessage.Ping));

await Task.WhenAll(
pinger.Start(ct),
ponger.Start(ct),
pinger.SendTo(pongerAddress, PingMessage.Ping, null, ct).AsTask(),
];
await Task.WhenAll(tasks).ConfigureAwait(false);
ponger.Start(ct)
).ConfigureAwait(false);

pingerHandler.OnProcessed -= OnProcessed;
Trace.Assert(pingerHandler.BadMessages is 0,
$"** Pinger: {pingerHandler.BadMessages} bad messages");
Expand Down
22 changes: 5 additions & 17 deletions benchmarks/Backdash.Benchmarks/Network/Message.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Net;
using Backdash.Network.Client;

Expand All @@ -18,35 +19,22 @@ sealed class PingMessageHandler(string name, IPeerClient<PingMessage> sender) :
public long BadMessages => badMessages;
public event Action<long> OnProcessed = delegate { };

public async ValueTask OnPeerMessage(
PingMessage message,
SocketAddress from,
int bytesReceived,
CancellationToken stoppingToken
public void OnPeerMessage(in PingMessage message, SocketAddress from, int bytesReceived
)
{
if (stoppingToken.IsCancellationRequested)
return;
if (stoppingToken.IsCancellationRequested)
return;
Interlocked.Increment(ref processedCount);

if (!Enum.IsDefined(message))
Interlocked.Increment(ref badMessages);

var reply = message switch
{
PingMessage.Ping => PingMessage.Pong,
PingMessage.Pong => PingMessage.Ping,
_ => throw new ArgumentOutOfRangeException(nameof(message), message, null),
};
try
{
await sender.SendTo(from, reply, null, stoppingToken);
}
catch (OperationCanceledException)
{
// skip
}

Trace.Assert(sender.TrySendTo(from, reply));
OnProcessed(processedCount);
#if DEBUG
Console.WriteLine(
Expand Down
6 changes: 3 additions & 3 deletions samples/SpaceWar.Lobby/Scenes/LobbyScene.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ void StartPlayerBattleScene()
players.Add(player.PeerId == user.PeerId
? new LocalPlayer(playerNumber)
: new RemotePlayer(playerNumber,
lobbyUdpClient.GetFallbackEndpoint(user, player)));
LobbyUdpClient.GetFallbackEndpoint(user, player)));
}

if (lobbyInfo.SpectatorMapping.SingleOrDefault(m => m.Host == user.PeerId)
Expand All @@ -373,7 +373,7 @@ void StartPlayerBattleScene()
var spectators = lobbyInfo.Spectators.Where(s => spectatorIds.Contains(s.PeerId));
foreach (var spectator in spectators)
{
var spectatorEndpoint = lobbyUdpClient.GetFallbackEndpoint(user, spectator);
var spectatorEndpoint = LobbyUdpClient.GetFallbackEndpoint(user, spectator);
players.Add(new Spectator(spectatorEndpoint));
}
}
Expand All @@ -388,7 +388,7 @@ void StartSpectatorBattleScene()
?.Host;
var host = lobbyInfo.Players.Single(x => x.PeerId == hostId);
var playerCount = lobbyInfo.Players.Length;
var hostEndpoint = lobbyUdpClient.GetFallbackEndpoint(user, host);
var hostEndpoint = LobbyUdpClient.GetFallbackEndpoint(user, host);


Window.Title = $"Space War {Config.LocalPort} - {user.Username} watching {host.Username}";
Expand Down
2 changes: 1 addition & 1 deletion samples/SpaceWar.Lobby/Services/LobbyUdpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async ValueTask Receive(CancellationToken stoppingToken)
}

// Use local IP when over same network
public IPEndPoint GetFallbackEndpoint(User user, Peer peer)
public static IPEndPoint GetFallbackEndpoint(User user, Peer peer)
{
if (Equals(peer.Endpoint.Address, user.IP) && peer.LocalEndpoint is not null)
return peer.LocalEndpoint;
Expand Down
2 changes: 1 addition & 1 deletion samples/SpaceWar.Shared/Logic/GameState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void Init(int numberOfPlayers)
Missile: false
);

public GameInput ParseShipInputs(PlayerInputs inputs, in Ship ship)
public static GameInput ParseShipInputs(PlayerInputs inputs, in Ship ship)
{
if (!ship.Active)
return new();
Expand Down
4 changes: 2 additions & 2 deletions src/Backdash/Backends/Peer2PeerBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sealed class Peer2PeerBackend<TInput, TGameState> : IRollbackSession<TInput, TGa
readonly Synchronizer<TInput, TGameState> synchronizer;
readonly ConnectionsState localConnections;
readonly IBackgroundJobManager backgroundJobManager;
readonly ProtocolInputEventQueue<TInput> peerInputEventQueue;
readonly IProtocolInputEventQueue<TInput> peerInputEventQueue;
readonly IProtocolInputEventPublisher<ConfirmedInputs<TInput>> peerCombinedInputsEventPublisher;
readonly PeerConnectionFactory peerConnectionFactory;
readonly List<PeerConnection<ConfirmedInputs<TInput>>> spectators;
Expand Down Expand Up @@ -72,7 +72,7 @@ BackendServices<TInput, TGameState> services
Random = services.DeterministicRandom;
syncNumber = services.Random.MagicNumber();

peerInputEventQueue = new();
peerInputEventQueue = new ProtocolInputEventQueue<TInput>();
peerCombinedInputsEventPublisher = new ProtocolCombinedInputsEventPublisher<TInput>(peerInputEventQueue);
inputGroupSerializer = new ConfirmedInputsSerializer<TInput>(inputSerializer);
localConnections = new(Max.NumberOfPlayers);
Expand Down
2 changes: 1 addition & 1 deletion src/Backdash/Network/Client/PeerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public async Task StartReceiving(CancellationToken cancellationToken)
try
{
serializer.Deserialize(buffer.AsSpan(..receivedSize), ref msg);
await observer.OnPeerMessage(msg, address, receivedSize, cancellationToken).ConfigureAwait(false);
observer.OnPeerMessage(in msg, address, receivedSize);
}
catch (NetcodeDeserializationException ex)
{
Expand Down
21 changes: 13 additions & 8 deletions src/Backdash/Network/Client/PeerObserver.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
using System.Net;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace Backdash.Network.Client;

/// <summary>
/// Observe a <see cref="IPeerClient{T}"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IPeerObserver<in T> where T : struct
public interface IPeerObserver<T> where T : struct
{
/// <summary>
/// Handle new message from peer
/// </summary>
ValueTask OnPeerMessage(T message, SocketAddress from, int bytesReceived, CancellationToken stoppingToken);
void OnPeerMessage(in T message, SocketAddress from, int bytesReceived);
}

sealed class PeerObserverGroup<T> : IPeerObserver<T>
Expand All @@ -22,12 +23,16 @@ sealed class PeerObserverGroup<T> : IPeerObserver<T>
public void Add(IPeerObserver<T> observer) => observers.Add(observer);
public void Remove(IPeerObserver<T> observer) => observers.Remove(observer);

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public async ValueTask OnPeerMessage(
T message, SocketAddress from, int bytesReceived, CancellationToken stoppingToken
)
public void OnPeerMessage(in T message, SocketAddress from, int bytesReceived)
{
for (var i = 0; i < observers.Count; i++)
await observers[i].OnPeerMessage(message, from, bytesReceived, stoppingToken).ConfigureAwait(false);
var span = CollectionsMarshal.AsSpan(observers);
ref var pointer = ref MemoryMarshal.GetReference(span);
ref var end = ref Unsafe.Add(ref pointer, span.Length);

while (Unsafe.IsAddressLessThan(ref pointer, ref end))
{
pointer.OnPeerMessage(in message, from, bytesReceived);
pointer = ref Unsafe.Add(ref pointer, 1)!;
}
}
}
4 changes: 2 additions & 2 deletions src/Backdash/Network/PeerConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ IProtocolInputEventPublisher<TInput> inputEventQueue
var timeSync = new TimeSync<TInput>(timeSyncOptions, logger);
var outbox = new ProtocolOutbox(state, peer, clock, logger);
var syncManager = new ProtocolSynchronizer(logger, clock, random, state, options, outbox, networkEventHandler);
var inbox = new ProtocolInbox<TInput>(
options, inputSerializer, state, clock, syncManager, outbox, networkEventHandler, inputEventQueue, logger);
var inbox = new ProtocolInbox<TInput>(options, inputSerializer, state, clock, syncManager, outbox,
networkEventHandler, inputEventQueue, logger);
var inputBuffer =
new ProtocolInputBuffer<TInput>(options, inputSerializer, state, logger, timeSync, outbox, inbox);

Expand Down
1 change: 0 additions & 1 deletion src/Backdash/Network/Protocol/Comm/IMessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ namespace Backdash.Network.Protocol.Comm;

interface IMessageSender
{
ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct);
bool SendMessage(in ProtocolMessage msg);
}
25 changes: 10 additions & 15 deletions src/Backdash/Network/Protocol/Comm/ProtocolInbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ Logger logger
public GameInput<TInput> LastReceivedInput => lastReceivedInput;
public Frame LastAckedFrame { get; private set; } = Frame.Null;

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public async ValueTask OnPeerMessage(
ProtocolMessage message,
SocketAddress from,
int bytesReceived,
CancellationToken stoppingToken
)
public void OnPeerMessage(in ProtocolMessage message, SocketAddress from, int bytesReceived)
{
if (!from.Equals(state.PeerAddress.Address))
return;
Expand Down Expand Up @@ -70,17 +64,18 @@ CancellationToken stoppingToken
var skipped = (ushort)(seqNum - nextReceivedSeq);
if (skipped > options.MaxSequenceDistance)
{
logger.Write(LogLevel.Debug, $"dropping out of order packet (seq: {seqNum}, last seq:{nextReceivedSeq})");
logger.Write(LogLevel.Debug,
$"dropping out of order packet (seq: {seqNum}, last seq:{nextReceivedSeq})");
return;
}
}

nextReceivedSeq = seqNum;
logger.Write(LogLevel.Trace, $"recv {message} from {state.Player}");
if (HandleMessage(ref message, out var replyMsg))
if (HandleMessage(in message, out var replyMsg))
{
if (replyMsg.Header.Type is not MessageType.Unknown)
await messageSender.SendMessageAsync(in replyMsg, stoppingToken).ConfigureAwait(false);
if (replyMsg.Header.Type is not MessageType.Unknown && !messageSender.SendMessage(in replyMsg))
logger.Write(LogLevel.Warning, $"inbox response dropped (seq: {seqNum})");

state.Stats.Received.LastTime = clock.GetTimeStamp();
state.Stats.Received.TotalPackets++;
Expand All @@ -93,14 +88,14 @@ CancellationToken stoppingToken
}
}

bool HandleMessage(ref ProtocolMessage message, out ProtocolMessage replyMsg)
bool HandleMessage(in ProtocolMessage message, out ProtocolMessage replyMsg)
{
replyMsg = new(MessageType.Unknown);
var handled = message.Header.Type switch
{
MessageType.SyncRequest => OnSyncRequest(in message, ref replyMsg),
MessageType.SyncReply => OnSyncReply(in message, ref replyMsg),
MessageType.Input => OnInput(ref message.Input),
MessageType.Input => OnInput(in message.Input),
MessageType.QualityReport => OnQualityReport(in message, out replyMsg),
MessageType.QualityReply => OnQualityReply(in message),
MessageType.InputAck => OnInputAck(in message),
Expand All @@ -112,7 +107,7 @@ bool HandleMessage(ref ProtocolMessage message, out ProtocolMessage replyMsg)
return handled;
}

bool OnInput(ref InputMessage msg)
bool OnInput(in InputMessage msg)
{
logger.Write(LogLevel.Trace, $"Acked Frame: {LastAckedFrame}");
/*
Expand Down Expand Up @@ -158,7 +153,7 @@ in remoteStatus[i].LastFrame
lastReceivedFrame = msg.StartFrame.Previous();
var nextFrame = lastReceivedFrame.Next();
var currentFrame = msg.StartFrame;
var decompressor = InputEncoder.GetDecompressor(ref msg);
var decompressor = InputEncoder.GetDecompressor(ref Unsafe.AsRef(in msg));
if (currentFrame < nextFrame)
{
var framesAhead = nextFrame.Number - currentFrame.Number;
Expand Down
3 changes: 0 additions & 3 deletions src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ Logger logger
{
int nextSendSeq;

public ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct) =>
peer.SendTo(state.PeerAddress.Address, in msg, this, ct);

public bool SendMessage(in ProtocolMessage msg) => peer.TrySendTo(state.PeerAddress.Address, in msg, this);

public void BeforeSendMessage(ref ProtocolMessage message)
Expand Down
Loading

0 comments on commit 838f0fa

Please sign in to comment.