Skip to content

Commit

Permalink
throw away bytes in send buffer after a failed send (#368)
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Lloyd <[email protected]>
  • Loading branch information
caleblloyd authored Feb 2, 2024
1 parent ad5c0b2 commit 72f1213
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 48 deletions.
119 changes: 72 additions & 47 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.Buffers;
using System.IO.Pipelines;
using System.Linq.Expressions;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
Expand All @@ -20,6 +18,9 @@ namespace NATS.Client.Core.Commands;
/// </remarks>
internal sealed class CommandWriter : IAsyncDisposable
{
// set to a reasonable socket write mem size
private const int MaxSendSize = 16384;

private readonly ILogger<CommandWriter> _logger;
private readonly ObjectPool _pool;
private readonly int _arrayPoolInitialSize;
Expand All @@ -36,6 +37,7 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly PipeReader _pipeReader;
private readonly PipeWriter _pipeWriter;
private ISocketConnection? _socketConnection;
private Task? _flushTask;
private Task? _readerLoopTask;
private CancellationTokenSource? _ctsReader;
private volatile bool _disposed;
Expand Down Expand Up @@ -150,16 +152,16 @@ public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken ca
throw new ObjectDisposedException(nameof(CommandWriter));
}

_protocolWriter.WriteConnect(_pipeWriter, connectOpts);

var size = (int)_pipeWriter.UnflushedBytes;
_channelSize.Writer.TryWrite(size);

var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false);
if (result.IsCanceled)
if (_flushTask is { IsCompletedSuccessfully: false })
{
throw new OperationCanceledException();
await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false);
}

_protocolWriter.WriteConnect(_pipeWriter, connectOpts);

_channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes);
var flush = _pipeWriter.FlushAsync(CancellationToken.None);
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
}
finally
{
Expand All @@ -179,18 +181,17 @@ public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken canc
throw new ObjectDisposedException(nameof(CommandWriter));
}

_enqueuePing(pingCommand);
if (_flushTask is { IsCompletedSuccessfully: false })
{
await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false);
}

_enqueuePing(pingCommand);
_protocolWriter.WritePing(_pipeWriter);

var size = (int)_pipeWriter.UnflushedBytes;
_channelSize.Writer.TryWrite(size);

var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false);
if (result.IsCanceled)
{
throw new OperationCanceledException();
}
_channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes);
var flush = _pipeWriter.FlushAsync(CancellationToken.None);
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
}
finally
{
Expand All @@ -210,16 +211,16 @@ public async ValueTask PongAsync(CancellationToken cancellationToken = default)
throw new ObjectDisposedException(nameof(CommandWriter));
}

_protocolWriter.WritePong(_pipeWriter);

var size = (int)_pipeWriter.UnflushedBytes;
_channelSize.Writer.TryWrite(size);

var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false);
if (result.IsCanceled)
if (_flushTask is { IsCompletedSuccessfully: false })
{
throw new OperationCanceledException();
await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false);
}

_protocolWriter.WritePong(_pipeWriter);

_channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes);
var flush = _pipeWriter.FlushAsync(CancellationToken.None);
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
}
finally
{
Expand Down Expand Up @@ -258,16 +259,16 @@ public async ValueTask SubscribeAsync(int sid, string subject, string? queueGrou
throw new ObjectDisposedException(nameof(CommandWriter));
}

_protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs);

var size = (int)_pipeWriter.UnflushedBytes;
_channelSize.Writer.TryWrite(size);

var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false);
if (result.IsCanceled)
if (_flushTask is { IsCompletedSuccessfully: false })
{
throw new OperationCanceledException();
await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false);
}

_protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs);

_channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes);
var flush = _pipeWriter.FlushAsync(CancellationToken.None);
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
}
finally
{
Expand All @@ -287,16 +288,16 @@ public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken
throw new ObjectDisposedException(nameof(CommandWriter));
}

_protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs);

var size = (int)_pipeWriter.UnflushedBytes;
_channelSize.Writer.TryWrite(size);

var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false);
if (result.IsCanceled)
if (_flushTask is { IsCompletedSuccessfully: false })
{
throw new OperationCanceledException();
await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false);
}

_protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs);

_channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes);
var flush = _pipeWriter.FlushAsync(CancellationToken.None);
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
}
finally
{
Expand Down Expand Up @@ -343,14 +344,26 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
var totalSize = 0;
while (totalSent < bufferLength)
{
var sendMemory = memory;
if (sendMemory.Length > MaxSendSize)
{
// cap the send size, the OS can only handle so much in a send buffer at a time
// also if the send fails, we have to throw this many bytes away
sendMemory = memory[..MaxSendSize];
}

int sent;
Exception? sendEx = null;
try
{
sent = await connection.SendAsync(memory).ConfigureAwait(false);
sent = await connection.SendAsync(sendMemory).ConfigureAwait(false);
}
catch (SocketException)
catch (Exception ex)
{
break;
// we have no idea how many bytes were actually sent, so we have to assume they all were
// this could result in message loss, but is consistent with at-most once delivery
sendEx = ex;
sent = sendMemory.Length;
}

totalSent += sent;
Expand All @@ -361,6 +374,7 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
int peek;
while (!channelSize.Reader.TryPeek(out peek))
{
// should never happen; channel sizes are written before flush is called
await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}

Expand All @@ -370,7 +384,12 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
break;
}

var size = await channelSize.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
int size;
while (!channelSize.Reader.TryRead(out size))
{
// should never happen; channel sizes are written before flush is called (plus we just peeked)
await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}

totalSize += size;
examinedOffset = 0;
Expand All @@ -380,6 +399,12 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
consumed = buffer.GetPosition(totalSize);
examined = buffer.GetPosition(totalSent);
examinedOffset += totalSent - totalSize;

// throw if there was a send failure
if (sendEx != null)
{
throw sendEx;
}
}
}
finally
Expand Down
4 changes: 3 additions & 1 deletion tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public async Task Retry_and_connect_after_disconnected()
[Fact]
public async Task Reconnect_doesnt_drop_partially_sent_msgs()
{
const int msgSize = 1048576; // 1MiB
await using var server = NatsServer.Start();

await using var pubConn = server.CreateClientConnection();
Expand Down Expand Up @@ -86,6 +87,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs()
}
else
{
Assert.Equal(msgSize, msg.Data.Length);
Interlocked.Increment(ref received);
}
}
Expand All @@ -99,7 +101,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs()
}

var sent = 0;
var data = new byte[1048576]; // 1MiB
var data = new byte[msgSize];
var sendTask = Task.Run(async () =>
{
while (!stopCts.IsCancellationRequested)
Expand Down

0 comments on commit 72f1213

Please sign in to comment.