Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to use Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets #387

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Obsidian/Client.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Obsidian.API.Events;
using Obsidian.Concurrency;
using Obsidian.Entities;
Expand All @@ -10,7 +11,6 @@
using Obsidian.Net.Packets.Login;
using Obsidian.Net.Packets.Play;
using Obsidian.Net.Packets.Play.Clientbound;
using Obsidian.Net.Packets.Play.Serverbound;
using Obsidian.Net.Packets.Status;
using Obsidian.Registries;
using Obsidian.Utilities.Mojang;
Expand Down Expand Up @@ -103,17 +103,17 @@ public sealed class Client : IDisposable
/// <summary>
/// The base network stream used by the <see cref="minecraftStream"/>.
/// </summary>
private readonly NetworkStream networkStream;
private readonly DuplexPipeStream networkStream;

/// <summary>
/// Used to continuously send and receive encrypted packets from the client.
/// </summary>
private readonly PacketCryptography packetCryptography;

/// <summary>
/// The socket associated with the <see cref="networkStream"/>.
/// The connection context associated with the <see cref="networkStream"/>.
/// </summary>
private readonly Socket socket;
private readonly ConnectionContext connectionContext;

/// <summary>
/// The current server configuration.
Expand All @@ -138,7 +138,7 @@ public sealed class Client : IDisposable
/// <summary>
/// The client's ip and port used to establish this connection.
/// </summary>
public EndPoint? RemoteEndPoint => socket.RemoteEndPoint;
public EndPoint? RemoteEndPoint => connectionContext.RemoteEndPoint;

/// <summary>
/// Executed when the client disconnects.
Expand All @@ -165,25 +165,25 @@ public sealed class Client : IDisposable
/// </summary>
public string? Brand { get; set; }

public Client(Socket socket, ServerConfiguration config, int playerId, Server originServer)
public Client(ConnectionContext connectionContext, ServerConfiguration config, int playerId, Server originServer)
{
this.socket = socket;
this.connectionContext = connectionContext;
this.config = config;
id = playerId;
Server = originServer;

LoadedChunks = new();
packetCryptography = new();
handler = new(config);
networkStream = new(socket);
networkStream = new(connectionContext.Transport);
minecraftStream = new(networkStream);

missedKeepAlives = new List<long>();
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
var blockOptions = new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, EnsureOrdered = true };
var sendPacketBlock = new ActionBlock<IClientboundPacket>(packet =>
{
if (socket.Connected)
if (connectionContext.IsConnected())
SendPacket(packet);
}, blockOptions);

Expand Down Expand Up @@ -227,7 +227,7 @@ public Client(Socket socket, ServerConfiguration config, int playerId, Server or

public async Task StartConnectionAsync()
{
while (!cancellationSource.IsCancellationRequested && socket.Connected)
while (!cancellationSource.IsCancellationRequested && connectionContext.IsConnected())
{
(var id, var data) = await GetNextPacketAsync();

Expand Down Expand Up @@ -265,7 +265,7 @@ public async Task StartConnectionAsync()
{
if (this.Server.Config.CanThrottle)
{
string ip = ((IPEndPoint)socket.RemoteEndPoint!).Address.ToString();
string ip = ((IPEndPoint)connectionContext.RemoteEndPoint!).Address.ToString();

if (Server.throttler.TryGetValue(ip, out var timeLeft))
{
Expand Down Expand Up @@ -712,7 +712,7 @@ internal void SendPacket(IClientboundPacket packet)
catch (SocketException)
{
// Clients can disconnect at any point, causing exception to be raised
if (!socket.Connected)
if (!connectionContext.IsConnected())
{
Disconnect();
}
Expand Down Expand Up @@ -778,7 +778,7 @@ public void Dispose()
disposed = true;

minecraftStream.Dispose();
socket.Dispose();
connectionContext.Abort();
cancellationSource?.Dispose();

GC.SuppressFinalize(this);
Expand Down
6 changes: 2 additions & 4 deletions Obsidian/Commands/MainCommandModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,12 @@ public async Task SpawnEntityAsync(CommandContext context, string entityType)
[Command("stop")]
[CommandInfo("Stops the server.", "/stop")]
[RequirePermission(permissions: "obsidian.stop")]
public Task StopAsync(CommandContext ctx)
public async Task StopAsync(CommandContext ctx)
{
var server = (Server)ctx.Server;
server.BroadcastMessage($"Stopping server...");

server.Stop();

return Task.CompletedTask;
await server.StopAsync();
}

[Command("time")]
Expand Down
139 changes: 139 additions & 0 deletions Obsidian/Net/DuplexPipeStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading;

namespace Obsidian.Net;

// https://raw.githubusercontent.com/StarlkYT/BedrockFramework/main/src/Bedrock.Framework/Infrastructure/DuplexPipeStream.cs
// With some small changes.
internal sealed class DuplexPipeStream : Stream
{
private readonly PipeReader _input;
private readonly PipeWriter _output;
private readonly bool _throwOnCancelled;
private volatile bool _cancelCalled;

public DuplexPipeStream(IDuplexPipe pipe, bool throwOnCancelled = false)
{
_input = pipe.Input;
_output = pipe.Output;
_throwOnCancelled = throwOnCancelled;
}

public void CancelPendingRead()
{
_cancelCalled = true;
_input.CancelPendingRead();
}

public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => true;

public override long Length => throw new NotSupportedException();

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}

public override int Read(byte[] buffer, int offset, int count)
{
// ValueTask uses .GetAwaiter().GetResult() if necessary
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default).Result;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count,
CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination, cancellationToken);
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public async override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer != null)
{
_output.Write(new ReadOnlySpan<byte>(buffer, offset, count));
}

await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}

public async override ValueTask WriteAsync(ReadOnlyMemory<byte> source,
CancellationToken cancellationToken = default)
{
_output.Write(source.Span);
await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}

public override void Flush()
{
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return WriteAsync(null, 0, 0, cancellationToken);
}

private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await _input.ReadAsync(cancellationToken).ConfigureAwait(false);
var readableBuffer = result.Buffer;
try
{
if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
{
// Reset the bool
_cancelCalled = false;
throw new OperationCanceledException();
}

if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var count = (int)Math.Min(readableBuffer.Length, destination.Length);
readableBuffer = readableBuffer.Slice(0, count);
readableBuffer.CopyTo(destination.Span);
return count;
}

if (result.IsCompleted)
{
return 0;
}
}
finally
{
_input.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
}
}
}
22 changes: 22 additions & 0 deletions Obsidian/Net/SocketFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using System.Net;
using System.Threading;

namespace Obsidian.Net;

internal static class SocketFactory
{
public static async Task<IConnectionListener> CreateListenerAsync(IPEndPoint endPoint, SocketTransportOptions? options = null,
ILoggerFactory? loggerFactory = null, CancellationToken token = default)
{
options ??= new SocketTransportOptions();
loggerFactory ??= NullLoggerFactory.Instance;

var factory = new SocketTransportFactory(Options.Create(options), loggerFactory);
return await factory.BindAsync(endPoint, token);
}
}
4 changes: 4 additions & 0 deletions Obsidian/Obsidian.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<Compile Remove="Serializer\Enums\FieldType.cs" />
</ItemGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BouncyCastle.NetCoreSdk" Version="1.9.7" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.4.0" />
Expand Down
Loading