diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 98f887615..4352e974f 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -311,6 +311,7 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket { try { + var examinedOffset = 0; while (true) { var result = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); @@ -321,7 +322,11 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket } var buffer = result.Buffer; - var completed = buffer.Start; + var consumed = buffer.Start; + + buffer = buffer.Slice(examinedOffset); + var examined = buffer.Start; + try { if (!buffer.IsEmpty) @@ -334,7 +339,6 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket try { - completed = buffer.Start; var totalSent = 0; var totalSize = 0; while (totalSent < bufferLength) @@ -346,20 +350,28 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket while (totalSize < totalSent) { - int size; - while (!channelSize.Reader.TryRead(out size)) + int peek; + while (!channelSize.Reader.TryPeek(out peek)) { await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); } + // Don't just mark the message as complete if we have more data to send + if (totalSize + peek > totalSent) + { + break; + } + + var size = await channelSize.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + totalSize += size; + examinedOffset = 0; } // make sure to mark the buffer only at message boundaries. - // if there was a message sent only partially, we consider it sent even if it might not be - // due to a socket error in following socket send iteration. this is to avoid re-sending - // the same message again, ensuring at-most-once delivery. - completed = buffer.GetPosition(totalSize); + consumed = buffer.GetPosition(totalSize); + examined = buffer.GetPosition(totalSent); + examinedOffset += totalSent - totalSize; } } finally @@ -371,7 +383,7 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket finally { // Always examine to the end to potentially unblock writer - pipeReader.AdvanceTo(completed, buffer.End); + pipeReader.AdvanceTo(consumed, examined); } if (result.IsCompleted) diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index 261337c60..dadd708a1 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -131,6 +131,8 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() Assert.True(reconnects > 0, "connection did not reconnect"); Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received"); + _output.WriteLine($"reconnects: {reconnects}, sent: {sent}, received: {received}"); + // some messages may still be lost, as socket could have been disconnected // after socket.WriteAsync returned, but before OS sent // check to ensure that the loss was < 1%