Skip to content

Commit

Permalink
Migrate to use Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
TheVeryStarlk committed Aug 14, 2023
1 parent ca25dca commit 283244d
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 25 deletions.
19 changes: 10 additions & 9 deletions Obsidian/Client.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Obsidian.API.Events;
using Obsidian.Concurrency;
using Obsidian.Entities;
Expand Down Expand Up @@ -103,7 +104,7 @@ public sealed class Client : IDisposable
/// <summary>
/// The base network stream used by the <see cref="minecraftStream"/>.
/// </summary>
private readonly NetworkStream networkStream;
private readonly DuplexPipeStream networkStream;

/// <summary>
/// Used to continuously send and receive encrypted packets from the client.
Expand All @@ -113,7 +114,7 @@ public sealed class Client : IDisposable
/// <summary>
/// The socket associated with the <see cref="networkStream"/>.
/// </summary>
private readonly Socket socket;
private readonly ConnectionContext socket;

/// <summary>
/// The current server configuration.
Expand Down Expand Up @@ -165,7 +166,7 @@ public sealed class Client : IDisposable
/// </summary>
public string? Brand { get; set; }

public Client(Socket socket, ServerConfiguration config, int playerId, Server originServer)
public Client(ConnectionContext socket, ServerConfiguration config, int playerId, Server originServer)
{
this.socket = socket;
this.config = config;
Expand All @@ -175,15 +176,15 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or
LoadedChunks = new();
packetCryptography = new();
handler = new(config);
networkStream = new(socket);
networkStream = new(socket.Transport);
minecraftStream = new(networkStream);

missedKeepAlives = new List<long>();
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
var blockOptions = new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, EnsureOrdered = true };
var sendPacketBlock = new ActionBlock<IClientboundPacket>(packet =>
{
if (socket.Connected)
if (socket.IsConnected())
SendPacket(packet);
}, blockOptions);

Expand Down Expand Up @@ -227,7 +228,7 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or

public async Task StartConnectionAsync()
{
while (!cancellationSource.IsCancellationRequested && socket.Connected)
while (!cancellationSource.IsCancellationRequested && socket.IsConnected())
{
(var id, var data) = await GetNextPacketAsync();

Expand Down Expand Up @@ -712,7 +713,7 @@ internal void SendPacket(IClientboundPacket packet)
catch (SocketException)
{
// Clients can disconnect at any point, causing exception to be raised
if (!socket.Connected)
if (!socket.IsConnected())
{
Disconnect();
}
Expand Down Expand Up @@ -778,7 +779,7 @@ public void Dispose()
disposed = true;

minecraftStream.Dispose();
socket.Dispose();
socket.Abort();
cancellationSource?.Dispose();

GC.SuppressFinalize(this);
Expand Down
6 changes: 2 additions & 4 deletions Obsidian/Commands/MainCommandModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,12 @@ public async Task SpawnEntityAsync(CommandContext context, string entityType)
[Command("stop")]
[CommandInfo("Stops the server.", "/stop")]
[RequirePermission(permissions: "obsidian.stop")]
public Task StopAsync(CommandContext ctx)
public async Task StopAsync(CommandContext ctx)
{
var server = (Server)ctx.Server;
server.BroadcastMessage($"Stopping server...");

server.Stop();

return Task.CompletedTask;
await server.StopAsync();
}

[Command("time")]
Expand Down
139 changes: 139 additions & 0 deletions Obsidian/Net/DuplexPipeStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading;

namespace Obsidian.Net;

// https://raw.githubusercontent.com/StarlkYT/BedrockFramework/main/src/Bedrock.Framework/Infrastructure/DuplexPipeStream.cs
// With some small changes.
internal sealed class DuplexPipeStream : Stream
{
private readonly PipeReader _input;
private readonly PipeWriter _output;
private readonly bool _throwOnCancelled;
private volatile bool _cancelCalled;

public DuplexPipeStream(IDuplexPipe pipe, bool throwOnCancelled = false)
{
_input = pipe.Input;
_output = pipe.Output;
_throwOnCancelled = throwOnCancelled;
}

public void CancelPendingRead()
{
_cancelCalled = true;
_input.CancelPendingRead();
}

public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => true;

public override long Length => throw new NotSupportedException();

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}

public override int Read(byte[] buffer, int offset, int count)
{
// ValueTask uses .GetAwaiter().GetResult() if necessary
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default).Result;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count,
CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination, cancellationToken);
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public async override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer != null)
{
_output.Write(new ReadOnlySpan<byte>(buffer, offset, count));
}

await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}

public async override ValueTask WriteAsync(ReadOnlyMemory<byte> source,
CancellationToken cancellationToken = default)
{
_output.Write(source.Span);
await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}

public override void Flush()
{
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return WriteAsync(null, 0, 0, cancellationToken);
}

private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await _input.ReadAsync(cancellationToken).ConfigureAwait(false);
var readableBuffer = result.Buffer;
try
{
if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
{
// Reset the bool
_cancelCalled = false;
throw new OperationCanceledException();
}

if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var count = (int)Math.Min(readableBuffer.Length, destination.Length);
readableBuffer = readableBuffer.Slice(0, count);
readableBuffer.CopyTo(destination.Span);
return count;
}

if (result.IsCompleted)
{
return 0;
}
}
finally
{
_input.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
}
}
}
22 changes: 22 additions & 0 deletions Obsidian/Net/SocketFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using System.Net;
using System.Threading;

namespace Obsidian.Net;

internal static class SocketFactory
{
public static async Task<IConnectionListener> CreateListenerAsync(IPEndPoint endPoint, SocketTransportOptions? options = null,
ILoggerFactory? loggerFactory = null, CancellationToken token = default)
{
options ??= new SocketTransportOptions();
loggerFactory ??= NullLoggerFactory.Instance;

var factory = new SocketTransportFactory(Options.Create(options), loggerFactory);
return await factory.BindAsync(endPoint, token);
}
}
4 changes: 4 additions & 0 deletions Obsidian/Obsidian.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<Compile Remove="Serializer\Enums\FieldType.cs" />
</ItemGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BouncyCastle.NetCoreSdk" Version="1.9.7" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.4.0" />
Expand Down
35 changes: 24 additions & 11 deletions Obsidian/Server.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Obsidian.API.Boss;
Expand All @@ -12,6 +13,7 @@
using Obsidian.Entities;
using Obsidian.Events;
using Obsidian.Hosting;
using Obsidian.Net;
using Obsidian.Net.Packets;
using Obsidian.Net.Packets.Play.Clientbound;
using Obsidian.Net.Packets.Play.Serverbound;
Expand Down Expand Up @@ -56,10 +58,11 @@ public static string VERSION

private readonly ConcurrentQueue<IClientboundPacket> _chatMessagesQueue = new();
private readonly ConcurrentHashSet<Client> _clients = new();
private readonly TcpListener _tcpListener;
private readonly RconServer _rconServer;
private readonly ILogger _logger;

private IConnectionListener? _tcpListener;

public ProtocolVersion Protocol => DefaultProtocol;

public int Tps { get; private set; }
Expand Down Expand Up @@ -110,8 +113,6 @@ public Server(
Port = Config.Port;
ServerFolderPath = Directory.GetCurrentDirectory();

_tcpListener = new TcpListener(IPAddress.Any, Port);

Operators = new OperatorList(this);

_logger.LogDebug(message: "Initializing command handler...");
Expand Down Expand Up @@ -258,7 +259,7 @@ public async Task RunAsync()
if (Config.MulitplayerDebugMode && Config.OnlineMode)
{
_logger.LogError("Incompatible Config: Multiplayer debug mode can't be enabled at the same time as online mode since usernames will be overwritten");
Stop();
await StopAsync();
return;
}

Expand Down Expand Up @@ -333,14 +334,21 @@ private async Task HandleServerShutdown()

private async Task AcceptClientsAsync()
{
_tcpListener.Start();
_tcpListener = await SocketFactory.CreateListenerAsync(new IPEndPoint(IPAddress.Any, Port), token: _cancelTokenSource.Token);

while (!_cancelTokenSource.Token.IsCancellationRequested)
{
Socket socket;
ConnectionContext socket;
try
{
socket = await _tcpListener.AcceptSocketAsync(_cancelTokenSource.Token);
var connection = await _tcpListener.AcceptAsync(_cancelTokenSource.Token);
if (connection is null)
{
// No longer accepting clients.
break;
}

socket = connection;
}
catch (OperationCanceledException)
{
Expand All @@ -360,7 +368,7 @@ private async Task AcceptClientsAsync()
if (Config.IpWhitelistEnabled && !Config.WhitelistedIPs.Contains(ip))
{
_logger.LogInformation("{ip} is not whitelisted. Closing connection", ip);
await socket.DisconnectAsync(false);
socket.Abort();
return;
}

Expand Down Expand Up @@ -390,7 +398,7 @@ private async Task AcceptClientsAsync()
}

_logger.LogInformation("No longer accepting new clients");
_tcpListener.Stop();
await _tcpListener.UnbindAsync();
}

public IBossBar CreateBossBar(ChatMessage title, float health, BossBarColor color, BossBarDivisionType divisionType, BossBarFlags flags) => new BossBar(this)
Expand Down Expand Up @@ -629,10 +637,15 @@ internal void BroadcastPlayerAction(PlayerActionStore store, IBlock block)
}
}

public void Stop()
public async Task StopAsync()
{
_cancelTokenSource.Cancel();
_tcpListener.Stop();

if (_tcpListener is not null)
{
await _tcpListener.UnbindAsync();
}

WorldGenerators.Clear();
foreach (var client in _clients)
{
Expand Down
9 changes: 8 additions & 1 deletion Obsidian/Utilities/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Obsidian.Entities;
using Microsoft.AspNetCore.Connections;
using Obsidian.Entities;
using Obsidian.Registries;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
Expand Down Expand Up @@ -84,6 +85,12 @@ public static void ForEach<T>(this IEnumerable<T> collection, Action<T> action)
}
}

// Just to make stuff easier.
public static bool IsConnected(this ConnectionContext context)
{
return !context.ConnectionClosed.IsCancellationRequested;
}

// Derived from https://gist.github.com/ammaraskar/7b4a3f73bee9dc4136539644a0f27e63
[SuppressMessage("Roslyn", "CA5350", Justification = "SHA1 is required by the Minecraft protocol.")]
public static string MinecraftShaDigest(this IEnumerable<byte> data)
Expand Down

0 comments on commit 283244d

Please sign in to comment.