diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index e211435e9..31cc150d1 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -1,7 +1,5 @@ using System.Buffers; using System.IO.Pipelines; -using System.Linq.Expressions; -using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -20,6 +18,9 @@ namespace NATS.Client.Core.Commands; /// internal sealed class CommandWriter : IAsyncDisposable { + // set to a reasonable socket write mem size + private const int MaxSendSize = 16384; + private readonly ILogger _logger; private readonly ObjectPool _pool; private readonly int _arrayPoolInitialSize; @@ -36,6 +37,7 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly PipeReader _pipeReader; private readonly PipeWriter _pipeWriter; private ISocketConnection? _socketConnection; + private Task? _flushTask; private Task? _readerLoopTask; private CancellationTokenSource? _ctsReader; private volatile bool _disposed; @@ -150,16 +152,16 @@ public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken ca throw new ObjectDisposedException(nameof(CommandWriter)); } - _protocolWriter.WriteConnect(_pipeWriter, connectOpts); - - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) + if (_flushTask is { IsCompletedSuccessfully: false }) { - throw new OperationCanceledException(); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } + + _protocolWriter.WriteConnect(_pipeWriter, connectOpts); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { @@ -179,18 +181,17 @@ public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken canc throw new ObjectDisposedException(nameof(CommandWriter)); } - _enqueuePing(pingCommand); + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); + } + _enqueuePing(pingCommand); _protocolWriter.WritePing(_pipeWriter); - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) - { - throw new OperationCanceledException(); - } + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { @@ -210,16 +211,16 @@ public async ValueTask PongAsync(CancellationToken cancellationToken = default) throw new ObjectDisposedException(nameof(CommandWriter)); } - _protocolWriter.WritePong(_pipeWriter); - - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) + if (_flushTask is { IsCompletedSuccessfully: false }) { - throw new OperationCanceledException(); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } + + _protocolWriter.WritePong(_pipeWriter); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { @@ -258,16 +259,16 @@ public async ValueTask SubscribeAsync(int sid, string subject, string? queueGrou throw new ObjectDisposedException(nameof(CommandWriter)); } - _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); - - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) + if (_flushTask is { IsCompletedSuccessfully: false }) { - throw new OperationCanceledException(); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } + + _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { @@ -287,16 +288,16 @@ public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken throw new ObjectDisposedException(nameof(CommandWriter)); } - _protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs); - - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) + if (_flushTask is { IsCompletedSuccessfully: false }) { - throw new OperationCanceledException(); + await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); } + + _protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs); + + _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); + var flush = _pipeWriter.FlushAsync(CancellationToken.None); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); } finally { @@ -343,14 +344,26 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket var totalSize = 0; while (totalSent < bufferLength) { + var sendMemory = memory; + if (sendMemory.Length > MaxSendSize) + { + // cap the send size, the OS can only handle so much in a send buffer at a time + // also if the send fails, we have to throw this many bytes away + sendMemory = memory[..MaxSendSize]; + } + int sent; + Exception? sendEx = null; try { - sent = await connection.SendAsync(memory).ConfigureAwait(false); + sent = await connection.SendAsync(sendMemory).ConfigureAwait(false); } - catch (SocketException) + catch (Exception ex) { - break; + // we have no idea how many bytes were actually sent, so we have to assume they all were + // this could result in message loss, but is consistent with at-most once delivery + sendEx = ex; + sent = sendMemory.Length; } totalSent += sent; @@ -361,6 +374,7 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket int peek; while (!channelSize.Reader.TryPeek(out peek)) { + // should never happen; channel sizes are written before flush is called await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); } @@ -370,7 +384,12 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket break; } - var size = await channelSize.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + int size; + while (!channelSize.Reader.TryRead(out size)) + { + // should never happen; channel sizes are written before flush is called (plus we just peeked) + await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } totalSize += size; examinedOffset = 0; @@ -380,6 +399,12 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket consumed = buffer.GetPosition(totalSize); examined = buffer.GetPosition(totalSent); examinedOffset += totalSent - totalSize; + + // throw if there was a send failure + if (sendEx != null) + { + throw sendEx; + } } } finally diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index dadd708a1..132b727c3 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -58,6 +58,7 @@ public async Task Retry_and_connect_after_disconnected() [Fact] public async Task Reconnect_doesnt_drop_partially_sent_msgs() { + const int msgSize = 1048576; // 1MiB await using var server = NatsServer.Start(); await using var pubConn = server.CreateClientConnection(); @@ -86,6 +87,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() } else { + Assert.Equal(msgSize, msg.Data.Length); Interlocked.Increment(ref received); } } @@ -99,7 +101,7 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() } var sent = 0; - var data = new byte[1048576]; // 1MiB + var data = new byte[msgSize]; var sendTask = Task.Run(async () => { while (!stopCts.IsCancellationRequested)