Skip to content

Commit

Permalink
link events
Browse files Browse the repository at this point in the history
  • Loading branch information
caunt committed Jul 20, 2024
1 parent d93fe8e commit 33dd93e
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 34 deletions.
6 changes: 3 additions & 3 deletions src/Void.Proxy-before/Models/General/Player.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ static byte[] TwosComplement(byte[] data)
for (var i = data.Length - 1; i >= 0; i--)
{
data[i] = unchecked((byte)~data[i]);

if (!carry)
continue;

carry = data[i] == 0xFF;
data[i]++;
}
Expand Down Expand Up @@ -127,7 +127,7 @@ static byte[] TwosComplement(byte[] data)

if (GameProfile == null || IdentifiedKey == null || IdentifiedKey.Revision != IdentifiedKeyRevision.LINKED_V2)
return GameProfile;

if (!IdentifiedKey.AddGuid(GameProfile.Id))
throw new Exception("multiplayer.disconnect.invalid_public_key");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ public struct PlayerInfoUpdatePacket : IMinecraftPacket<PlayState>
public void Encode(ref MinecraftBuffer buffer, ProtocolVersion protocolVersion)
{
// if no players available, notchian server still send actions bitset, we dont?
var actions = Players.SelectMany(player => player.Actions.Select(action => action.Type));
var actions = Players.SelectMany(player => player.Actions.Select(action => action.Type)).ToArray();
var actionsFlags = (byte)actions.Aggregate(0, (current, action) => current | (byte)action);

buffer.WriteUnsignedByte(actionsFlags);
buffer.WriteVarInt(Players.Count);

foreach (var player in Players)
{
var missingActions = actions.Where(actionType => player.Actions.All(action => action.Type != actionType));
var missingActions = actions.Where(actionType => player.Actions.All(action => action.Type != actionType)).ToArray();

if (missingActions.Any())
throw new Exception($"Player {player.Guid} has missing actions: {string.Join(", ", missingActions)}");
Expand Down Expand Up @@ -54,6 +54,7 @@ public void Decode(ref MinecraftBuffer buffer, ProtocolVersion protocolVersion)
var playerActions = new List<IPlayerAction>();

foreach (var action in actions)
{
playerActions.Add(action switch
{
PlayerActionType.AddPlayer => new AddPlayerAction(ref buffer, protocolVersion),
Expand All @@ -64,6 +65,7 @@ public void Decode(ref MinecraftBuffer buffer, ProtocolVersion protocolVersion)
PlayerActionType.UpdateDisplayName => new UpdateDisplayNameAction(ref buffer, protocolVersion),
_ => throw new ArgumentOutOfRangeException($"Unknown update player action type {action}")
});
}

Players.Add(new PlayerInfo(playerGuid, playerActions));
}
Expand Down
15 changes: 15 additions & 0 deletions src/Void.Proxy.API/Events/Links/CreateLinkEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Void.Proxy.API.Links;
using Void.Proxy.API.Network.IO.Channels;
using Void.Proxy.API.Players;
using Void.Proxy.API.Servers;

namespace Void.Proxy.API.Events.Links;

public class CreateLinkEvent : IEventWithResult<ILink>
{
public required IPlayer Player { get; init; }
public required IServer Server { get; init; }
public required IMinecraftChannel PlayerChannel { get; init; }
public required IMinecraftChannel ServerChannel { get; init; }
public ILink? Result { get; set; }
}
8 changes: 8 additions & 0 deletions src/Void.Proxy.API/Events/Links/StartLinkEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Void.Proxy.API.Links;

namespace Void.Proxy.API.Events.Links;

public class StartLinkEvent : IEvent
{
public required ILink Link { get; init; }
}
8 changes: 8 additions & 0 deletions src/Void.Proxy.API/Events/Links/StopLinkEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Void.Proxy.API.Links;

namespace Void.Proxy.API.Events.Links;

public class StopLinkEvent : IEvent
{
public required ILink Link { get; init; }
}
4 changes: 2 additions & 2 deletions src/Void.Proxy.API/Events/Services/IEventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ public interface IEventService
{
public ValueTask ThrowAsync<T>(CancellationToken cancellationToken = default) where T : IEvent, new();
public ValueTask ThrowAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent;
public void RegisterListeners(IEventListener[] listeners);
public void UnregisterListeners(IEventListener[] listeners);
public void RegisterListeners(params IEventListener[] listeners);
public void UnregisterListeners(params IEventListener[] listeners);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Void.Proxy.API.Network.IO.Buffers;
using Void.Proxy.API.Network.IO.Channels;
using Void.Proxy.API.Network.IO.Streams;
using Void.Proxy.API.Network.IO.Streams.Codec;
using Void.Proxy.API.Network.Protocol;
using Void.Proxy.API.Plugins;

Expand Down
4 changes: 2 additions & 2 deletions src/Void.Proxy/Events/EventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async ValueTask ThrowAsync<T>(T @event, CancellationToken cancellationTok
}
}

public void RegisterListeners(IEventListener[] listeners)
public void RegisterListeners(params IEventListener[] listeners)
{
foreach (var listener in listeners)
{
Expand All @@ -56,7 +56,7 @@ public void RegisterListeners(IEventListener[] listeners)
_listeners.AddRange(listeners);
}

public void UnregisterListeners(IEventListener[] listeners)
public void UnregisterListeners(params IEventListener[] listeners)
{
foreach (var listener in listeners)
{
Expand Down
21 changes: 12 additions & 9 deletions src/Void.Proxy/Links/Link.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Nito.AsyncEx;
using Void.Proxy.API.Events;
using Void.Proxy.API.Events.Links;
using Void.Proxy.API.Events.Services;
using Void.Proxy.API.Links;
using Void.Proxy.API.Network.IO.Channels;
using Void.Proxy.API.Network.IO.Messages;
Expand All @@ -15,23 +18,23 @@ public class Link : ILink
private readonly CancellationTokenSource _ctsServerToPlayer;
private readonly CancellationTokenSource _ctsServerToPlayerForce;

private readonly Func<ILink, ValueTask> _finalizer;
private readonly AsyncLock _lock;
private readonly ILogger<Link> _logger;
private readonly IEventService _events;
private readonly AsyncLock _lock;

private readonly Task _playerToServerTask;
private readonly Task _serverToPlayerTask;

public Link(IPlayer player, IServer server, IMinecraftChannel playerChannel, IMinecraftChannel serverChannel, Func<ILink, ValueTask> finalize)
public Link(IPlayer player, IServer server, IMinecraftChannel playerChannel, IMinecraftChannel serverChannel, IEventService events)
{
Player = player;
Server = server;
PlayerChannel = playerChannel;
ServerChannel = serverChannel;

_lock = new AsyncLock();
_finalizer = finalize;
_logger = player.Scope.ServiceProvider.GetRequiredService<ILogger<Link>>();
_events = events;
_lock = new AsyncLock();

_ctsPlayerToServer = new CancellationTokenSource();
_ctsPlayerToServerForce = new CancellationTokenSource();
Expand Down Expand Up @@ -62,7 +65,7 @@ public async ValueTask DisposeAsync()
{
_logger.LogInformation("Timed out waiting Server {Server} disconnection from Player {Player} manually, closing forcefully", Server, Player);
await _ctsPlayerToServerForce.CancelAsync();

if (await WaitWithTimeout(_serverToPlayerTask))
throw new Exception($"Cannot dispose Link {this} (player=>server)");
}
Expand Down Expand Up @@ -124,8 +127,8 @@ protected async Task ExecuteAsync(IMinecraftChannel sourceChannel, IMinecraftCha
catch (Exception exception) when (exception is EndOfStreamException or IOException or TaskCanceledException or OperationCanceledException or ObjectDisposedException)
{
// client disconnected itself
// server catch unhandled exception
// link does server switch
// or server catch unhandled exception
// or link does server switch
}
catch (Exception exception)
{
Expand All @@ -136,7 +139,7 @@ protected async Task ExecuteAsync(IMinecraftChannel sourceChannel, IMinecraftCha
await PlayerChannel.FlushAsync();
await ServerChannel.FlushAsync();

_ = _finalizer(this);
_ = _events.ThrowAsync(new StopLinkEvent { Link = this }, forceCancellationToken);
}
}
}
63 changes: 49 additions & 14 deletions src/Void.Proxy/Links/LinkService.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,73 @@
using Void.Proxy.API.Links;
using Void.Proxy.API.Events;
using Void.Proxy.API.Events.Links;
using Void.Proxy.API.Events.Services;
using Void.Proxy.API.Links;
using Void.Proxy.API.Network.IO.Channels;
using Void.Proxy.API.Players;
using Void.Proxy.API.Servers;

namespace Void.Proxy.Links;

public class LinkService(
ILogger<LinkService> logger,
IServerService servers) : ILinkService
public class LinkService : ILinkService, IEventListener
{
private readonly List<ILink> _links = [];
private readonly List<ILink> _links =[];

private readonly ILogger<LinkService> _logger;
private readonly IServerService _servers;
private readonly IEventService _events;

public LinkService(ILogger<LinkService> logger, IServerService servers, IEventService events)
{
_logger = logger;
_servers = servers;
_events = events;

events.RegisterListeners(this);
}

public async ValueTask ConnectPlayerAnywhereAsync(IPlayer player)
{
var server = servers.RegisteredServers.First();
var server = _servers.RegisteredServers.First();

var playerChannel = await player.GetChannelAsync();
var serverChannel = await player.BuildServerChannelAsync(server);

var link = new Link(player, server, playerChannel, serverChannel, FinalizeAsync);
var link = await CreateLinkAsync(player, server, playerChannel, serverChannel);
_links.Add(link);

logger.LogInformation("Started forwarding {Link} traffic", link);
_logger.LogInformation("Started forwarding {Link} traffic", link);
}

private async ValueTask FinalizeAsync(ILink link)
[Subscribe]
public async ValueTask OnStopLinkEvent(StopLinkEvent @event)
{
if (!_links.Remove(link))
if (!_links.Remove(@event.Link))
return;

await link.DisposeAsync();
await using var _ = @event.Link;

if (@event.Link.IsAlive)
throw new Exception($"Link {@event.Link} is still alive");

if (link.IsAlive)
throw new Exception($"Link {link} is still alive");
_logger.LogInformation("Stopped forwarding {Link} traffic", @event.Link);
}

private async ValueTask<ILink> CreateLinkAsync(IPlayer player, IServer server, IMinecraftChannel playerChannel, IMinecraftChannel serverChannel)
{
var @event = new CreateLinkEvent
{
Player = player,
Server = server,
PlayerChannel = playerChannel,
ServerChannel = serverChannel
};

await _events.ThrowAsync(@event);

var link = @event.Result ?? new Link(player, server, playerChannel, serverChannel, _events);

logger.LogInformation("Stopped forwarding {Link} traffic", link);
await _events.ThrowAsync(new StartLinkEvent { Link = link });

return link;
}
}
3 changes: 2 additions & 1 deletion src/Void.Proxy/Platform.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography;
using Serilog.Core;
using Serilog.Events;
using Void.Proxy.API;
Expand Down Expand Up @@ -56,6 +55,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
// TODO disconnect everyone here

if (_backgroundTask is not null)
{
await _backgroundTask.ContinueWith(backgroundTask =>
{
if (backgroundTask.IsCanceled)
Expand All @@ -67,6 +67,7 @@ await _backgroundTask.ContinueWith(backgroundTask =>
throw backgroundTask.Exception?.Flatten()
.InnerException ?? new Exception("Proxy stopped with unknown exception");
}, cancellationToken);
}

_listener?.Stop();

Expand Down

0 comments on commit 33dd93e

Please sign in to comment.