Skip to content

Commit

Permalink
reimplement channel modifications, implemented channel pause
Browse files Browse the repository at this point in the history
  • Loading branch information
caunt committed Sep 29, 2024
1 parent 7a24cb1 commit 13d238d
Show file tree
Hide file tree
Showing 23 changed files with 476 additions and 116 deletions.
6 changes: 4 additions & 2 deletions src/API/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Void.Proxy.API.Extensions;
using Serilog;

namespace Void.Proxy.API.Extensions;

public static class TaskExtensions
{
Expand All @@ -7,7 +9,7 @@ public static Task CatchExceptions(this Task task)
return task.ContinueWith(completedTask =>
{
if (completedTask.Exception != null)
Console.WriteLine("Unhandled task Exception:\n" + completedTask.Exception.InnerException);
Log.Logger.Fatal("Unhandled task Exception:\n{Exception}", completedTask.Exception.InnerException);
}, TaskContinuationOptions.OnlyOnFaulted);
}

Expand Down
1 change: 0 additions & 1 deletion src/API/Links/ILink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ public interface ILink : IEventListener, IAsyncDisposable
public IMinecraftChannel ServerChannel { get; }
public bool IsAlive { get; }
public bool IsRestarting { get; }
public ValueTask RestartAsync(CancellationToken cancellationToken = default);
}
99 changes: 99 additions & 0 deletions src/API/Mojang/Profiles/IdentifiedKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System.Security.Cryptography;
using System.Text;
using Void.Proxy.API.Network.IO.Buffers;
using Void.Proxy.API.Network.Protocol;

namespace Void.Proxy.API.Mojang.Profiles;

public record IdentifiedKey(IdentifiedKeyRevision Revision, long ExpiresAt, byte[] PublicKey, byte[] Signature)
{
public static readonly byte[] YggdrasilSessionPublicKey = Convert.FromBase64String("MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAylB4B6m5lz7jwrcFz6Fd/fnfUhcvlxsTSn5kIK/2aGG1C3kMy4VjhwlxF6BFUSnfxhNswPjh3ZitkBxEAFY25uzkJFRwHwVA9mdwjashXILtR6OqdLXXFVyUPIURLOSWqGNBtb08EN5fMnG8iFLgEJIBMxs9BvF3s3/FhuHyPKiVTZmXY0WY4ZyYqvoKR+XjaTRPPvBsDa4WI2u1zxXMeHlodT3lnCzVvyOYBLXL6CJgByuOxccJ8hnXfF9yY4F0aeL080Jz/3+EBNG8RO4ByhtBf4Ny8NQ6stWsjfeUIvH7bU/4zCYcYOq4WrInXHqS8qruDmIl7P5XXGcabuzQstPf/h2CRAUpP/PlHXcMlvewjmGU6MfDK+lifScNYwjPxRo4nKTGFZf/0aqHCh/EAsQyLKrOIYRE0lDG3bzBh8ogIMLAugsAfBb6M3mqCqKaTMAf/VAjh5FFJnjS+7bE+bZEV0qwax1CEoPPJL1fIQjOS8zj086gjpGRCtSy9+bTPTfTR/SJ+VUB5G2IeCItkNHpJX2ygojFZ9n5Fnj7R9ZnOM+L8nyIjPu3aePvtcrXlyLhH/hvOfIOjPxOlqW+O5QwSFP4OEcyLAUgDdUgyW36Z5mB285uKW/ighzZsOTevVUG2QwDItObIV6i8RCxFbN2oDHyPaO5j1tTaBNyVt8CAwEAAQ==");

private bool? _isSignatureValid;

public Uuid? ProfileUuid { get; set; }

public bool? IsSignatureValid
{
get => _isSignatureValid ??= ValidateData(ProfileUuid ?? default);
set => _isSignatureValid = value;
}

public bool VerifyDataSignature(byte[] signature, byte[] data)
{
try
{
using var rsa = RSA.Create();
rsa.ImportSubjectPublicKeyInfo(PublicKey, out _);

return rsa.VerifyData(data, signature, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
}
catch
{
return false;
}
}

public bool AddUuid(Uuid uuid)
{
var guid = uuid.AsGuid;

if (guid == default)
return false;

var profileGuid = ProfileUuid?.AsGuid;

if (profileGuid != null)
return IsSignatureValid.HasValue && IsSignatureValid.Value && profileGuid.Equals(guid);

if (!ValidateData(uuid))
return false;

IsSignatureValid = true;
ProfileUuid = uuid;

return true;
}

private bool ValidateData(Uuid uuid)
{
var guid = uuid.AsGuid;

if (Revision == IdentifiedKeyRevision.GenericV1Revision)
{
var publicKeyText = $"-----BEGIN RSA PUBLIC KEY-----\n{Convert.ToBase64String(PublicKey, Base64FormattingOptions.InsertLineBreaks)}\n-----END RSA PUBLIC KEY-----\n";
var verify = Encoding.ASCII.GetBytes(ExpiresAt + publicKeyText.Replace("\r", string.Empty));

using var rsa = RSA.Create();
rsa.ImportSubjectPublicKeyInfo(YggdrasilSessionPublicKey, out _);

return rsa.VerifyData(verify, Signature, HashAlgorithmName.SHA1, RSASignaturePadding.Pkcs1);
}
else
{
if (guid == default)
return false;

var verify = new byte[PublicKey.Length + 24];
var buffer = new MinecraftBuffer(verify);
buffer.WriteUuid(uuid);
buffer.WriteLong(ExpiresAt);
buffer.Write(PublicKey);

using var rsa = RSA.Create();
rsa.ImportSubjectPublicKeyInfo(YggdrasilSessionPublicKey, out _);

return rsa.VerifyData(verify.AsSpan(0, buffer.Position), Signature, HashAlgorithmName.SHA1, RSASignaturePadding.Pkcs1);
}
}
}

public class IdentifiedKeyRevision(IEnumerable<IdentifiedKeyRevision> backwardsCompatibleTo, List<ProtocolVersion> applicableTo)
{
public static readonly IdentifiedKeyRevision GenericV1Revision = new([], [ProtocolVersion.MINECRAFT_1_19]);

public static readonly IdentifiedKeyRevision LinkedV2Revision = new([], [ProtocolVersion.MINECRAFT_1_19_1]);

public IEnumerable<IdentifiedKeyRevision> BackwardsCompatibleTo { get; } = backwardsCompatibleTo;
public List<ProtocolVersion> ApplicableTo { get; } = applicableTo;
}
2 changes: 2 additions & 0 deletions src/API/Mojang/Uuid.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Void.Proxy.API.Mojang;

public struct Uuid(Guid guid)
{
public static Uuid Empty { get; } = new(Guid.Empty);

public Guid AsGuid => guid;

public override string ToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public ReadOnlySpan<byte> Slice(int length)

// Should be safe in most cases, but prefer throw new NotSupportedException("That implementation would allocate memory, not supported yet")
if (_currentBlock.Length < _blockPosition + length)
throw new IndexOutOfRangeException($"Current block length is {_currentBlock.Length} and position is {_blockPosition}, attempted to slice {length} bytes, reading from next blocks not implemented.");
throw new IndexOutOfRangeException($"Current block length is {_currentBlock.Length} and position is {_blockPosition}, attempted to slice {length} bytes, reading from next blocks not implemented. Sequence length is {_sequence.Length}.");

var span = _currentBlock.Slice(_blockPosition, length);

Expand Down
5 changes: 4 additions & 1 deletion src/API/Network/IO/Channels/IMinecraftChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ public interface IMinecraftChannel : IDisposable, IAsyncDisposable
public bool CanWrite { get; }

public IMinecraftStreamBase Head { get; }

public bool IsConfigured { get; }
public bool IsPaused { get; }
public bool IsRedirectionSupported { get; }

public void Add<T>() where T : IMinecraftStream, new();
Expand All @@ -20,7 +22,8 @@ public interface IMinecraftChannel : IDisposable, IAsyncDisposable
public void PrependBuffer(Memory<byte> memory);
public ValueTask<IMinecraftMessage> ReadMessageAsync(CancellationToken cancellationToken = default);
public ValueTask WriteMessageAsync(IMinecraftMessage message, CancellationToken cancellationToken = default);
public void Flush();
public void Pause();
public void Resume();
public ValueTask FlushAsync(CancellationToken cancellationToken = default);
public void Close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ public abstract class MinecraftRecyclableStream
{
public static readonly RecyclableMemoryStreamManager RecyclableMemoryStreamManager = new(new RecyclableMemoryStreamManager.Options
{
BlockSize = 1024,
// TODO: replace BlockSize to 1024, but that will cause some packets to be unable to read
BlockSize = 2048,
LargeBufferMultiple = 1024 * 1024,
MaximumBufferSize = 16 * 1024 * 1024,
GenerateCallStacks = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public async ValueTask<IMinecraftChannel> BuildPlayerChannelAsync(IPlayer player
public async ValueTask<IMinecraftChannel> BuildServerChannelAsync(IServer server, CancellationToken cancellationToken = default)
{
logger.LogTrace("Building channel for a {Server} server", server);
var channel = await BuildChannelAsync(Direction.Clientbound, server.CreateTcpClient().GetStream(), cancellationToken);

var client = server.CreateTcpClient();
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);

var channel = await BuildChannelAsync(Direction.Clientbound, client.GetStream(), cancellationToken);
logger.LogTrace("Server {Name} is using {ChannelTypeName} channel implementation", server.Name, channel.GetType().Name);

return channel;
Expand Down
27 changes: 25 additions & 2 deletions src/Common/Network/IO/Channels/SimpleChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ namespace Void.Proxy.Common.Network.IO.Channels;

public class SimpleChannel(IMinecraftStreamBase head) : IMinecraftChannel
{
private TaskCompletionSource? _pause;

public bool CanRead => true;
public bool CanWrite => true;

public IMinecraftStreamBase Head => head;

public bool IsConfigured => head is IMinecraftStream;
public bool IsPaused => _pause is { Task.IsCompleted: false };
public bool IsRedirectionSupported => false;

public void Add<T>() where T : IMinecraftStream, new()
Expand Down Expand Up @@ -56,6 +60,9 @@ public void PrependBuffer(Memory<byte> memory)

public async ValueTask<IMinecraftMessage> ReadMessageAsync(CancellationToken cancellationToken = default)
{
if (_pause is not null)
await _pause.Task;

return head switch
{
IMinecraftPacketMessageStream stream => await stream.ReadPacketAsync(cancellationToken),
Expand Down Expand Up @@ -83,9 +90,20 @@ public async ValueTask WriteMessageAsync(IMinecraftMessage message, Cancellation
}
}

public void Flush()
public void Pause()
{
head.Flush();
if (_pause is { Task.IsCompleted: false })
throw new InvalidOperationException($"{nameof(IMinecraftChannel)} is already paused");

_pause = new TaskCompletionSource();
}

public void Resume()
{
if (_pause is null or { Task.IsCompleted: true })
throw new InvalidOperationException($"{nameof(IMinecraftChannel)} is not paused");

_pause.SetResult();
}

public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
Expand All @@ -108,6 +126,11 @@ public async ValueTask DisposeAsync()
await head.DisposeAsync();
}

public void Flush()
{
head.Flush();
}

private T Get<T>(IMinecraftStreamBase? baseStream) where T : IMinecraftStreamBase
{
var current = baseStream ?? head;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public IMinecraftPacket DecodePacket(RecyclableMemoryStream stream)
stream.Dispose();

if (buffer.HasData)
throw new IndexOutOfRangeException($"The packet was not fully read. Bytes read: {buffer.Position}, Total length: {buffer.Length}.");
throw new IndexOutOfRangeException($"{packet} packet was not fully read. Bytes read: {buffer.Position}, Total length: {buffer.Length}.");

return packet;
}
Expand Down
60 changes: 7 additions & 53 deletions src/Platform/Links/Link.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ namespace Void.Proxy.Links;

public class Link : ILink
{
private readonly CancellationTokenSource _ctsPlayerToServer;
private readonly CancellationTokenSource _ctsPlayerToServerForce;
private readonly CancellationTokenSource _ctsServerToPlayer;
private readonly CancellationTokenSource _ctsServerToPlayerForce;
private readonly IEventService _events;
private readonly AsyncLock _lock;
private readonly ILogger _logger;
private CancellationTokenSource _ctsPlayerToServer;
private CancellationTokenSource _ctsPlayerToServerForce;
private CancellationTokenSource _ctsServerToPlayer;
private CancellationTokenSource _ctsServerToPlayerForce;
private Task _playerToServerTask;
private Task _serverToPlayerTask;
private readonly Task _playerToServerTask;
private readonly Task _serverToPlayerTask;

public Link(IPlayer player, IServer server, IMinecraftChannel playerChannel, IMinecraftChannel serverChannel, ILogger logger, IEventService events)
{
Expand Down Expand Up @@ -50,53 +50,7 @@ public Link(IPlayer player, IServer server, IMinecraftChannel playerChannel, IMi
public IMinecraftChannel ServerChannel { get; init; }

public bool IsAlive => _playerToServerTask.Status == TaskStatus.Running && _serverToPlayerTask.Status == TaskStatus.Running;
public bool IsRestarting { get; private set; }

public async ValueTask RestartAsync(CancellationToken cancellationToken = default)
{
if (IsRestarting)
{
_logger.LogWarning("Link {Link} is already restarting", this);
return;
}

_logger.LogTrace("Link {Link} is restarting", this);

IsRestarting = true;

await _ctsServerToPlayer.CancelAsync();
_ctsServerToPlayer = new CancellationTokenSource();

if (await WaitWithTimeout(_serverToPlayerTask))
{
_logger.LogTrace("Timed out waiting Server {Server} disconnection from Player {Player} manually, closing forcefully (Restart)", Server, Player);
await _ctsServerToPlayerForce.CancelAsync();
_ctsServerToPlayerForce = new CancellationTokenSource();

if (await WaitWithTimeout(_serverToPlayerTask))
throw new Exception($"Cannot dispose Link {this} (player=>server) (Restart)");
}

await _ctsPlayerToServer.CancelAsync();
_ctsPlayerToServer = new CancellationTokenSource();

if (await WaitWithTimeout(_playerToServerTask))
{
_logger.LogTrace("Timed out waiting Player {Player} disconnection from Server {Server} manually, closing forcefully (Restart)", Player, Server);
await _ctsPlayerToServerForce.CancelAsync();
_ctsPlayerToServerForce = new CancellationTokenSource();

if (await WaitWithTimeout(_playerToServerTask))
throw new Exception($"Cannot dispose Link {this} (server=>player) (Restart)");
}

IsRestarting = false;

_playerToServerTask = ExecuteAsync(PlayerChannel, ServerChannel, Direction.Serverbound, _ctsPlayerToServer.Token, _ctsPlayerToServerForce.Token);
_serverToPlayerTask = ExecuteAsync(ServerChannel, PlayerChannel, Direction.Clientbound, _ctsServerToPlayer.Token, _ctsServerToPlayerForce.Token);

_logger.LogTrace("Link {Link} successfully restarted", this);
}
public bool IsRestarting { get; }

public async ValueTask DisposeAsync()
{
Expand Down
1 change: 1 addition & 0 deletions src/Platform/Platform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public async Task StartAsync(CancellationToken cancellationToken)

logger.LogInformation("Starting connection listener");
_listener = new TcpListener(settings.Address, settings.Port);
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
_listener.Start();

logger.LogInformation("Connection listener started on port {Port}", settings.Port);
Expand Down
Loading

0 comments on commit 13d238d

Please sign in to comment.