Skip to content

Commit

Permalink
Fixing buffer msg boundry issue
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Feb 2, 2024
1 parent 54c007f commit cce3450
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
30 changes: 21 additions & 9 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
{
try
{
var examinedOffset = 0;
while (true)
{
var result = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -321,7 +322,11 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
}

var buffer = result.Buffer;
var completed = buffer.Start;
var consumed = buffer.Start;

buffer = buffer.Slice(examinedOffset);
var examined = buffer.Start;

try
{
if (!buffer.IsEmpty)
Expand All @@ -334,7 +339,6 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket

try
{
completed = buffer.Start;
var totalSent = 0;
var totalSize = 0;
while (totalSent < bufferLength)
Expand All @@ -346,20 +350,28 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket

while (totalSize < totalSent)
{
int size;
while (!channelSize.Reader.TryRead(out size))
int peek;
while (!channelSize.Reader.TryPeek(out peek))
{
await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}

// Don't just mark the message as complete if we have more data to send
if (totalSize + peek > totalSent)
{
break;
}

var size = await channelSize.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);

totalSize += size;
examinedOffset = 0;
}

// make sure to mark the buffer only at message boundaries.
// if there was a message sent only partially, we consider it sent even if it might not be
// due to a socket error in following socket send iteration. this is to avoid re-sending
// the same message again, ensuring at-most-once delivery.
completed = buffer.GetPosition(totalSize);
consumed = buffer.GetPosition(totalSize);
examined = buffer.GetPosition(totalSent);
examinedOffset += totalSent - totalSize;
}
}
finally
Expand All @@ -371,7 +383,7 @@ private static async Task ReaderLoopAsync(ILogger<CommandWriter> logger, ISocket
finally
{
// Always examine to the end to potentially unblock writer
pipeReader.AdvanceTo(completed, buffer.End);
pipeReader.AdvanceTo(consumed, examined);
}

if (result.IsCompleted)
Expand Down
2 changes: 2 additions & 0 deletions tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs()
Assert.True(reconnects > 0, "connection did not reconnect");
Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received");

_output.WriteLine($"reconnects: {reconnects}, sent: {sent}, received: {received}");

// some messages may still be lost, as socket could have been disconnected
// after socket.WriteAsync returned, but before OS sent
// check to ensure that the loss was < 1%
Expand Down

0 comments on commit cce3450

Please sign in to comment.