Skip to content

Commit

Permalink
Test that reconnect doesn't drop partially sent messages (#360)
Browse files Browse the repository at this point in the history
* test reconnect doesn't drop partially sent msgs

Signed-off-by: Caleb Lloyd <[email protected]>

* socket is nullable

Signed-off-by: Caleb Lloyd <[email protected]>

* check loss <= 1%

Signed-off-by: Caleb Lloyd <[email protected]>

---------

Signed-off-by: Caleb Lloyd <[email protected]>
  • Loading branch information
caleblloyd authored Jan 30, 2024
1 parent e3d5198 commit ed1dca8
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public NatsConnectionState ConnectionState

internal ObjectPool ObjectPool => _pool;

// only used for internal testing
internal ISocketConnection? TestSocket => _socket;

/// <summary>
/// Connect socket and write CONNECT command to nats server.
/// </summary>
Expand Down
97 changes: 85 additions & 12 deletions tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ public class ConnectionRetryTest
public async Task Max_retry_reached_after_disconnect()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 2,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(.1),
});
await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 2, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(.1), });

var signal = new WaitSignal();
nats.ReconnectFailed += (_, _) =>
Expand All @@ -37,12 +32,7 @@ public async Task Max_retry_reached_after_disconnect()
public async Task Retry_and_connect_after_disconnected()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 10,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(2),
});
await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 10, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(2), });

var signal = new WaitSignal();
nats.ReconnectFailed += (_, _) =>
Expand All @@ -64,4 +54,87 @@ public async Task Retry_and_connect_after_disconnected()
var rtt = await nats.PingAsync(cts.Token);
Assert.True(rtt > TimeSpan.Zero);
}

[Fact]
public async Task Reconnect_doesnt_drop_partially_sent_msgs()
{
await using var server = NatsServer.Start();

await using var pubConn = server.CreateClientConnection();
await pubConn.ConnectAsync();

var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

var received = 0;
var subActive = 0;
var subTask = Task.Run(async () =>
{
await using var subConn = server.CreateClientConnection();
await using var sub = await subConn.SubscribeCoreAsync<NatsMemoryOwner<byte>>("test", cancellationToken: timeoutCts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(timeoutCts.Token))
{
using (msg.Data)
{
if (msg.Data.Length == 1)
{
Interlocked.Increment(ref subActive);
}
else if (msg.Data.Length == 2)
{
break;
}
else
{
Interlocked.Increment(ref received);
}
}
}
});

while (Interlocked.CompareExchange(ref subActive, 0, 0) == 0)
{
await pubConn.PublishAsync("test", new byte[1], cancellationToken: timeoutCts.Token);
await Task.Delay(50, timeoutCts.Token);
}

var sent = 0;
var data = new byte[1048576]; // 1MiB
var sendTask = Task.Run(async () =>
{
while (!stopCts.IsCancellationRequested)
{
await pubConn.PublishAsync("test", data, cancellationToken: timeoutCts.Token);
Interlocked.Increment(ref sent);
}

await pubConn.PublishAsync("test", new byte[2], cancellationToken: timeoutCts.Token);
});

var reconnects = 0;
var restartTask = Task.Run(async () =>
{
while (!stopCts.IsCancellationRequested)
{
if (pubConn is { ConnectionState: NatsConnectionState.Open, TestSocket.WaitForClosed.IsCanceled: false })
{
await pubConn.TestSocket.AbortConnectionAsync(timeoutCts.Token);
Interlocked.Increment(ref reconnects);
}

// give it some time to send more
await Task.Delay(100, timeoutCts.Token);
}
});

await Task.WhenAll(subTask, sendTask, restartTask);
Assert.True(reconnects > 0, "connection did not reconnect");
Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received");

// some messages may still be lost, as socket could have been disconnected
// after socket.WriteAsync returned, but before OS sent
// check to ensure that the loss was < 1%
var loss = 100.0 - (100.0 * received / sent);
Assert.True(loss <= 1.0, $"message loss of {loss:F}% was above 1% - {sent} sent, {received} received");
}
}

0 comments on commit ed1dca8

Please sign in to comment.