From 283244d65c369bd7e3fdb3478619271d71becf7b Mon Sep 17 00:00:00 2001 From: "starlkytminecraft@gmail.com" Date: Tue, 15 Aug 2023 01:37:12 +0300 Subject: [PATCH] Migrate to use `Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets` --- Obsidian/Client.cs | 19 ++-- Obsidian/Commands/MainCommandModule.cs | 6 +- Obsidian/Net/DuplexPipeStream.cs | 139 +++++++++++++++++++++++++ Obsidian/Net/SocketFactory.cs | 22 ++++ Obsidian/Obsidian.csproj | 4 + Obsidian/Server.cs | 35 +++++-- Obsidian/Utilities/Extensions.cs | 9 +- 7 files changed, 209 insertions(+), 25 deletions(-) create mode 100644 Obsidian/Net/DuplexPipeStream.cs create mode 100644 Obsidian/Net/SocketFactory.cs diff --git a/Obsidian/Client.cs b/Obsidian/Client.cs index 6e14cb460..519f16845 100644 --- a/Obsidian/Client.cs +++ b/Obsidian/Client.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; using Obsidian.API.Events; using Obsidian.Concurrency; using Obsidian.Entities; @@ -103,7 +104,7 @@ public sealed class Client : IDisposable /// /// The base network stream used by the . /// - private readonly NetworkStream networkStream; + private readonly DuplexPipeStream networkStream; /// /// Used to continuously send and receive encrypted packets from the client. @@ -113,7 +114,7 @@ public sealed class Client : IDisposable /// /// The socket associated with the . /// - private readonly Socket socket; + private readonly ConnectionContext socket; /// /// The current server configuration. @@ -165,7 +166,7 @@ public sealed class Client : IDisposable /// public string? Brand { get; set; } - public Client(Socket socket, ServerConfiguration config, int playerId, Server originServer) + public Client(ConnectionContext socket, ServerConfiguration config, int playerId, Server originServer) { this.socket = socket; this.config = config; @@ -175,7 +176,7 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or LoadedChunks = new(); packetCryptography = new(); handler = new(config); - networkStream = new(socket); + networkStream = new(socket.Transport); minecraftStream = new(networkStream); missedKeepAlives = new List(); @@ -183,7 +184,7 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or var blockOptions = new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, EnsureOrdered = true }; var sendPacketBlock = new ActionBlock(packet => { - if (socket.Connected) + if (socket.IsConnected()) SendPacket(packet); }, blockOptions); @@ -227,7 +228,7 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or public async Task StartConnectionAsync() { - while (!cancellationSource.IsCancellationRequested && socket.Connected) + while (!cancellationSource.IsCancellationRequested && socket.IsConnected()) { (var id, var data) = await GetNextPacketAsync(); @@ -712,7 +713,7 @@ internal void SendPacket(IClientboundPacket packet) catch (SocketException) { // Clients can disconnect at any point, causing exception to be raised - if (!socket.Connected) + if (!socket.IsConnected()) { Disconnect(); } @@ -778,7 +779,7 @@ public void Dispose() disposed = true; minecraftStream.Dispose(); - socket.Dispose(); + socket.Abort(); cancellationSource?.Dispose(); GC.SuppressFinalize(this); diff --git a/Obsidian/Commands/MainCommandModule.cs b/Obsidian/Commands/MainCommandModule.cs index e7480a8b1..d6cd6c6ae 100644 --- a/Obsidian/Commands/MainCommandModule.cs +++ b/Obsidian/Commands/MainCommandModule.cs @@ -328,14 +328,12 @@ public async Task SpawnEntityAsync(CommandContext context, string entityType) [Command("stop")] [CommandInfo("Stops the server.", "/stop")] [RequirePermission(permissions: "obsidian.stop")] - public Task StopAsync(CommandContext ctx) + public async Task StopAsync(CommandContext ctx) { var server = (Server)ctx.Server; server.BroadcastMessage($"Stopping server..."); - server.Stop(); - - return Task.CompletedTask; + await server.StopAsync(); } [Command("time")] diff --git a/Obsidian/Net/DuplexPipeStream.cs b/Obsidian/Net/DuplexPipeStream.cs new file mode 100644 index 000000000..b5d2ce928 --- /dev/null +++ b/Obsidian/Net/DuplexPipeStream.cs @@ -0,0 +1,139 @@ +using System.Buffers; +using System.IO; +using System.IO.Pipelines; +using System.Threading; + +namespace Obsidian.Net; + +// https://raw.githubusercontent.com/StarlkYT/BedrockFramework/main/src/Bedrock.Framework/Infrastructure/DuplexPipeStream.cs +// With some small changes. +internal sealed class DuplexPipeStream : Stream +{ + private readonly PipeReader _input; + private readonly PipeWriter _output; + private readonly bool _throwOnCancelled; + private volatile bool _cancelCalled; + + public DuplexPipeStream(IDuplexPipe pipe, bool throwOnCancelled = false) + { + _input = pipe.Input; + _output = pipe.Output; + _throwOnCancelled = throwOnCancelled; + } + + public void CancelPendingRead() + { + _cancelCalled = true; + _input.CancelPendingRead(); + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + // ValueTask uses .GetAwaiter().GetResult() if necessary + // https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156 + return ReadAsyncInternal(new Memory(buffer, offset, count), default).Result; + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination, cancellationToken); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public async override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) + { + if (buffer != null) + { + _output.Write(new ReadOnlySpan(buffer, offset, count)); + } + + await _output.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + public async override ValueTask WriteAsync(ReadOnlyMemory source, + CancellationToken cancellationToken = default) + { + _output.Write(source.Span); + await _output.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + public override void Flush() + { + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return WriteAsync(null, 0, 0, cancellationToken); + } + + private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) + { + while (true) + { + var result = await _input.ReadAsync(cancellationToken).ConfigureAwait(false); + var readableBuffer = result.Buffer; + try + { + if (_throwOnCancelled && result.IsCanceled && _cancelCalled) + { + // Reset the bool + _cancelCalled = false; + throw new OperationCanceledException(); + } + + if (!readableBuffer.IsEmpty) + { + // buffer.Count is int + var count = (int)Math.Min(readableBuffer.Length, destination.Length); + readableBuffer = readableBuffer.Slice(0, count); + readableBuffer.CopyTo(destination.Span); + return count; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _input.AdvanceTo(readableBuffer.End, readableBuffer.End); + } + } + } +} diff --git a/Obsidian/Net/SocketFactory.cs b/Obsidian/Net/SocketFactory.cs new file mode 100644 index 000000000..8549c97a1 --- /dev/null +++ b/Obsidian/Net/SocketFactory.cs @@ -0,0 +1,22 @@ +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using System.Net; +using System.Threading; + +namespace Obsidian.Net; + +internal static class SocketFactory +{ + public static async Task CreateListenerAsync(IPEndPoint endPoint, SocketTransportOptions? options = null, + ILoggerFactory? loggerFactory = null, CancellationToken token = default) + { + options ??= new SocketTransportOptions(); + loggerFactory ??= NullLoggerFactory.Instance; + + var factory = new SocketTransportFactory(Options.Create(options), loggerFactory); + return await factory.BindAsync(endPoint, token); + } +} diff --git a/Obsidian/Obsidian.csproj b/Obsidian/Obsidian.csproj index 261688201..0bd8fa4f2 100644 --- a/Obsidian/Obsidian.csproj +++ b/Obsidian/Obsidian.csproj @@ -54,6 +54,10 @@ + + + + diff --git a/Obsidian/Server.cs b/Obsidian/Server.cs index 890169b2d..548b956a3 100644 --- a/Obsidian/Server.cs +++ b/Obsidian/Server.cs @@ -1,3 +1,4 @@ +using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Obsidian.API.Boss; @@ -12,6 +13,7 @@ using Obsidian.Entities; using Obsidian.Events; using Obsidian.Hosting; +using Obsidian.Net; using Obsidian.Net.Packets; using Obsidian.Net.Packets.Play.Clientbound; using Obsidian.Net.Packets.Play.Serverbound; @@ -56,10 +58,11 @@ public static string VERSION private readonly ConcurrentQueue _chatMessagesQueue = new(); private readonly ConcurrentHashSet _clients = new(); - private readonly TcpListener _tcpListener; private readonly RconServer _rconServer; private readonly ILogger _logger; + private IConnectionListener? _tcpListener; + public ProtocolVersion Protocol => DefaultProtocol; public int Tps { get; private set; } @@ -110,8 +113,6 @@ public Server( Port = Config.Port; ServerFolderPath = Directory.GetCurrentDirectory(); - _tcpListener = new TcpListener(IPAddress.Any, Port); - Operators = new OperatorList(this); _logger.LogDebug(message: "Initializing command handler..."); @@ -258,7 +259,7 @@ public async Task RunAsync() if (Config.MulitplayerDebugMode && Config.OnlineMode) { _logger.LogError("Incompatible Config: Multiplayer debug mode can't be enabled at the same time as online mode since usernames will be overwritten"); - Stop(); + await StopAsync(); return; } @@ -333,14 +334,21 @@ private async Task HandleServerShutdown() private async Task AcceptClientsAsync() { - _tcpListener.Start(); + _tcpListener = await SocketFactory.CreateListenerAsync(new IPEndPoint(IPAddress.Any, Port), token: _cancelTokenSource.Token); while (!_cancelTokenSource.Token.IsCancellationRequested) { - Socket socket; + ConnectionContext socket; try { - socket = await _tcpListener.AcceptSocketAsync(_cancelTokenSource.Token); + var connection = await _tcpListener.AcceptAsync(_cancelTokenSource.Token); + if (connection is null) + { + // No longer accepting clients. + break; + } + + socket = connection; } catch (OperationCanceledException) { @@ -360,7 +368,7 @@ private async Task AcceptClientsAsync() if (Config.IpWhitelistEnabled && !Config.WhitelistedIPs.Contains(ip)) { _logger.LogInformation("{ip} is not whitelisted. Closing connection", ip); - await socket.DisconnectAsync(false); + socket.Abort(); return; } @@ -390,7 +398,7 @@ private async Task AcceptClientsAsync() } _logger.LogInformation("No longer accepting new clients"); - _tcpListener.Stop(); + await _tcpListener.UnbindAsync(); } public IBossBar CreateBossBar(ChatMessage title, float health, BossBarColor color, BossBarDivisionType divisionType, BossBarFlags flags) => new BossBar(this) @@ -629,10 +637,15 @@ internal void BroadcastPlayerAction(PlayerActionStore store, IBlock block) } } - public void Stop() + public async Task StopAsync() { _cancelTokenSource.Cancel(); - _tcpListener.Stop(); + + if (_tcpListener is not null) + { + await _tcpListener.UnbindAsync(); + } + WorldGenerators.Clear(); foreach (var client in _clients) { diff --git a/Obsidian/Utilities/Extensions.cs b/Obsidian/Utilities/Extensions.cs index 5292f5294..fa123ee11 100644 --- a/Obsidian/Utilities/Extensions.cs +++ b/Obsidian/Utilities/Extensions.cs @@ -1,4 +1,5 @@ -using Obsidian.Entities; +using Microsoft.AspNetCore.Connections; +using Obsidian.Entities; using Obsidian.Registries; using System.Diagnostics.CodeAnalysis; using System.Globalization; @@ -84,6 +85,12 @@ public static void ForEach(this IEnumerable collection, Action action) } } + // Just to make stuff easier. + public static bool IsConnected(this ConnectionContext context) + { + return !context.ConnectionClosed.IsCancellationRequested; + } + // Derived from https://gist.github.com/ammaraskar/7b4a3f73bee9dc4136539644a0f27e63 [SuppressMessage("Roslyn", "CA5350", Justification = "SHA1 is required by the Minecraft protocol.")] public static string MinecraftShaDigest(this IEnumerable data)