From e150869c50f1bbe05f9e1e911114f2816ffde9d9 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Fri, 26 Jan 2024 10:26:38 -0500 Subject: [PATCH 1/3] test reconnect doesn't drop partially sent msgs Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/NatsConnection.cs | 3 + .../ConnectionRetryTest.cs | 77 +++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index fac33fc07..05c311839 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -133,6 +133,9 @@ public NatsConnectionState ConnectionState internal ObjectPool ObjectPool => _pool; + // only used for internal testing + internal ISocketConnection TestSocket => _socket; + /// /// Connect socket and write CONNECT command to nats server. /// diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index dc1107b31..23c12232d 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -64,4 +64,81 @@ public async Task Retry_and_connect_after_disconnected() var rtt = await nats.PingAsync(cts.Token); Assert.True(rtt > TimeSpan.Zero); } + + [Fact] + public async Task Reconnect_doesnt_drop_partially_sent_msgs() + { + await using var server = NatsServer.Start(); + + await using var pubConn = server.CreateClientConnection(); + await pubConn.ConnectAsync(); + + var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var received = 0; + var subActive = 0; + var subTask = Task.Run(async () => + { + await using var subConn = server.CreateClientConnection(); + await using var sub = await subConn.SubscribeCoreAsync>("test", cancellationToken: timeoutCts.Token); + await foreach (var msg in sub.Msgs.ReadAllAsync(timeoutCts.Token)) + { + using (msg.Data) + { + if (msg.Data.Length == 1) + { + Interlocked.Increment(ref subActive); + } + else if (msg.Data.Length == 2) + { + break; + } + else + { + Interlocked.Increment(ref received); + } + } + } + }); + + while (Interlocked.CompareExchange(ref subActive, 0, 0) == 0) + { + await pubConn.PublishAsync("test", new byte[1], cancellationToken: timeoutCts.Token); + await Task.Delay(50, timeoutCts.Token); + } + + var sent = 0; + var data = new byte[1048576]; // 1MiB + var sendTask = Task.Run(async () => + { + while (!stopCts.IsCancellationRequested) + { + await pubConn.PublishAsync("test", data, cancellationToken: timeoutCts.Token); + Interlocked.Increment(ref sent); + } + + await pubConn.PublishAsync("test", new byte[2], cancellationToken: timeoutCts.Token); + }); + + var reconnects = 0; + var restartTask = Task.Run(async () => + { + while (!stopCts.IsCancellationRequested) + { + if (pubConn is { ConnectionState: NatsConnectionState.Open, TestSocket.WaitForClosed.IsCanceled: false }) + { + await pubConn.TestSocket.AbortConnectionAsync(timeoutCts.Token); + Interlocked.Increment(ref reconnects); + } + + // give it some time to send more + await Task.Delay(100, timeoutCts.Token); + } + }); + + await Task.WhenAll(subTask, sendTask, restartTask); + Assert.True(reconnects > 0, "connection did not reconnect"); + Assert.Equal(sent, received); + } } From 12512f7ef9998caef2c38fab7db3b3fdc964faeb Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Fri, 26 Jan 2024 11:15:30 -0500 Subject: [PATCH 2/3] socket is nullable Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/NatsConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 05c311839..75c1fe17f 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -134,7 +134,7 @@ public NatsConnectionState ConnectionState internal ObjectPool ObjectPool => _pool; // only used for internal testing - internal ISocketConnection TestSocket => _socket; + internal ISocketConnection? TestSocket => _socket; /// /// Connect socket and write CONNECT command to nats server. From b3925ed36a96661e192f5ac0761fe59aabf178fc Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Tue, 30 Jan 2024 10:47:05 -0500 Subject: [PATCH 3/3] check loss <= 1% Signed-off-by: Caleb Lloyd --- .../ConnectionRetryTest.cs | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index 23c12232d..261337c60 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -10,12 +10,7 @@ public class ConnectionRetryTest public async Task Max_retry_reached_after_disconnect() { await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(new NatsOpts - { - MaxReconnectRetry = 2, - ReconnectWaitMax = TimeSpan.Zero, - ReconnectWaitMin = TimeSpan.FromSeconds(.1), - }); + await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 2, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(.1), }); var signal = new WaitSignal(); nats.ReconnectFailed += (_, _) => @@ -37,12 +32,7 @@ public async Task Max_retry_reached_after_disconnect() public async Task Retry_and_connect_after_disconnected() { await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(new NatsOpts - { - MaxReconnectRetry = 10, - ReconnectWaitMax = TimeSpan.Zero, - ReconnectWaitMin = TimeSpan.FromSeconds(2), - }); + await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 10, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(2), }); var signal = new WaitSignal(); nats.ReconnectFailed += (_, _) => @@ -139,6 +129,12 @@ public async Task Reconnect_doesnt_drop_partially_sent_msgs() await Task.WhenAll(subTask, sendTask, restartTask); Assert.True(reconnects > 0, "connection did not reconnect"); - Assert.Equal(sent, received); + Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received"); + + // some messages may still be lost, as socket could have been disconnected + // after socket.WriteAsync returned, but before OS sent + // check to ensure that the loss was < 1% + var loss = 100.0 - (100.0 * received / sent); + Assert.True(loss <= 1.0, $"message loss of {loss:F}% was above 1% - {sent} sent, {received} received"); } }