Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

新增两个 send 方法 #599

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/SuperSocket.Channel/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public abstract class ChannelBase<TPackageInfo> : IChannel<TPackageInfo>, IChann

public abstract ValueTask SendAsync(Action<PipeWriter> write);

public abstract ValueTask<int> Send(ReadOnlyMemory<byte> buffer);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send => SendAsync?


public abstract ValueTask<int> Send<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send => SendAsync?


public bool IsClosed { get; private set; }

public EndPoint RemoteEndPoint { get; protected set; }
Expand Down
5 changes: 5 additions & 0 deletions src/SuperSocket.Channel/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public interface IChannel

ValueTask SendAsync(Action<PipeWriter> write);

ValueTask<int> Send(ReadOnlyMemory<byte> data);

ValueTask<int> Send<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line?>

ValueTask CloseAsync(CloseReason closeReason);

event EventHandler<CloseEventArgs> Closed;
Expand Down
14 changes: 14 additions & 0 deletions src/SuperSocket.Channel/PipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ public override async ValueTask SendAsync(Action<PipeWriter> write)
}
}

public override async ValueTask<int> Send(ReadOnlyMemory<byte> buffer)
{
return await SendOverIOAsync(new ReadOnlySequence<byte>(buffer), CancellationToken.None);
}

public override async ValueTask<int> Send<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
{
ArrayBufferWriter<byte> arrayBufferWriter = new ArrayBufferWriter<byte>();

WritePackageWithEncoder(arrayBufferWriter, packageEncoder, package);

return await SendOverIOAsync(new ReadOnlySequence<byte>(arrayBufferWriter.WrittenMemory), CancellationToken.None);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space.

}

protected void WritePackageWithEncoder<TPackage>(IBufferWriter<byte> writer, IPackageEncoder<TPackage> packageEncoder, TPackage package)
{
CheckChannelOpen();
Expand Down
2 changes: 1 addition & 1 deletion src/SuperSocket.Channel/TcpPipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override async ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> b
_segmentsForSend.Clear();
}

var segments = _segmentsForSend;
//var segments = _segmentsForSend;

foreach (var piece in buffer)
{
Expand Down
12 changes: 6 additions & 6 deletions src/SuperSocket.Channel/UdpPipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ namespace SuperSocket.Channel
{
public class UdpPipeChannel<TPackageInfo> : VirtualChannel<TPackageInfo>, IChannelWithSessionIdentifier
{
private Socket _socket;
private readonly Socket _socket;

private bool _enableSendingPipe;
private readonly bool _enableSendingPipe;

public UdpPipeChannel(Socket socket, IPipelineFilter<TPackageInfo> pipelineFilter, ChannelOptions options, IPEndPoint remoteEndPoint)
: this(socket, pipelineFilter, options, remoteEndPoint, $"{remoteEndPoint.Address}:{remoteEndPoint.Port}")
Expand Down Expand Up @@ -60,8 +60,8 @@ protected override async ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> b
var destBuffer = pool.Rent((int)buffer.Length);

try
{
MergeBuffer(ref buffer, destBuffer);
{
UdpPipeChannel<TPackageInfo>.MergeBuffer(ref buffer, destBuffer);
return await _socket.SendToAsync(new ArraySegment<byte>(destBuffer, 0, (int)buffer.Length), SocketFlags.None, RemoteEndPoint);
}
finally
Expand All @@ -78,7 +78,7 @@ protected override Task ProcessSends()
return Task.CompletedTask;
}

private void MergeBuffer(ref ReadOnlySequence<byte> buffer, byte[] destBuffer)
private static void MergeBuffer(ref ReadOnlySequence<byte> buffer, byte[] destBuffer)
{
Span<byte> destSpan = destBuffer;

Expand All @@ -88,7 +88,7 @@ private void MergeBuffer(ref ReadOnlySequence<byte> buffer, byte[] destBuffer)
{
piece.Span.CopyTo(destSpan);
total += piece.Length;
destSpan = destSpan.Slice(piece.Length);
destSpan = destSpan[piece.Length..];
}
}

Expand Down
20 changes: 12 additions & 8 deletions src/SuperSocket.Client/EasyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,8 @@ public EasyClient(IPipelineFilter<TReceivePackage> pipelineFilter, ILogger logge

public EasyClient(IPipelineFilter<TReceivePackage> pipelineFilter, ChannelOptions options)
{
if (pipelineFilter == null)
throw new ArgumentNullException(nameof(pipelineFilter));

if (options == null)
throw new ArgumentNullException(nameof(options));

_pipelineFilter = pipelineFilter;
Options = options;
_pipelineFilter = pipelineFilter ?? throw new ArgumentNullException(nameof(pipelineFilter));
Options = options ?? throw new ArgumentNullException(nameof(options));
Logger = options.Logger;
}

Expand Down Expand Up @@ -318,6 +312,16 @@ protected virtual async ValueTask SendAsync<TSendPackage>(IPackageEncoder<TSendP
await Channel.SendAsync(packageEncoder, package);
}

public ValueTask<int> Send(ReadOnlyMemory<byte> data)
{
return Channel.Send(data);
}

public ValueTask<int> Send<TSendPackage>(IPackageEncoder<TSendPackage> packageEncoder, TSendPackage package)
{
return Channel.Send<TSendPackage>(packageEncoder, package);
}

public event EventHandler Closed;

public virtual async ValueTask CloseAsync()
Expand Down
4 changes: 4 additions & 0 deletions src/SuperSocket.Client/IEasyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface IEasyClient<TReceivePackage>

ValueTask SendAsync<TSendPackage>(IPackageEncoder<TSendPackage> packageEncoder, TSendPackage package);

ValueTask<int> Send(ReadOnlyMemory<byte> data);

ValueTask<int> Send<TSendPackage>(IPackageEncoder<TSendPackage> packageEncoder, TSendPackage package);

event EventHandler Closed;

event PackageHandler<TReceivePackage> PackageHandler;
Expand Down