Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test that reconnect doesn't drop partially sent messages #360

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public NatsConnectionState ConnectionState

internal ObjectPool ObjectPool => _pool;

// only used for internal testing
internal ISocketConnection? TestSocket => _socket;
caleblloyd marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Connect socket and write CONNECT command to nats server.
/// </summary>
Expand Down
97 changes: 85 additions & 12 deletions tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ public class ConnectionRetryTest
public async Task Max_retry_reached_after_disconnect()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 2,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(.1),
});
await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 2, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(.1), });

var signal = new WaitSignal();
nats.ReconnectFailed += (_, _) =>
Expand All @@ -37,12 +32,7 @@ public async Task Max_retry_reached_after_disconnect()
public async Task Retry_and_connect_after_disconnected()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 10,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(2),
});
await using var nats = server.CreateClientConnection(new NatsOpts { MaxReconnectRetry = 10, ReconnectWaitMax = TimeSpan.Zero, ReconnectWaitMin = TimeSpan.FromSeconds(2), });

var signal = new WaitSignal();
nats.ReconnectFailed += (_, _) =>
Expand All @@ -64,4 +54,87 @@ public async Task Retry_and_connect_after_disconnected()
var rtt = await nats.PingAsync(cts.Token);
Assert.True(rtt > TimeSpan.Zero);
}

[Fact]
public async Task Reconnect_doesnt_drop_partially_sent_msgs()
{
await using var server = NatsServer.Start();

await using var pubConn = server.CreateClientConnection();
await pubConn.ConnectAsync();

var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

var received = 0;
var subActive = 0;
var subTask = Task.Run(async () =>
{
await using var subConn = server.CreateClientConnection();
await using var sub = await subConn.SubscribeCoreAsync<NatsMemoryOwner<byte>>("test", cancellationToken: timeoutCts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(timeoutCts.Token))
{
using (msg.Data)
{
if (msg.Data.Length == 1)
{
Interlocked.Increment(ref subActive);
}
else if (msg.Data.Length == 2)
{
break;
}
else
{
Interlocked.Increment(ref received);
}
}
}
});

while (Interlocked.CompareExchange(ref subActive, 0, 0) == 0)
{
await pubConn.PublishAsync("test", new byte[1], cancellationToken: timeoutCts.Token);
await Task.Delay(50, timeoutCts.Token);
}

var sent = 0;
var data = new byte[1048576]; // 1MiB
var sendTask = Task.Run(async () =>
{
while (!stopCts.IsCancellationRequested)
{
await pubConn.PublishAsync("test", data, cancellationToken: timeoutCts.Token);
Interlocked.Increment(ref sent);
}

await pubConn.PublishAsync("test", new byte[2], cancellationToken: timeoutCts.Token);
});

var reconnects = 0;
var restartTask = Task.Run(async () =>
{
while (!stopCts.IsCancellationRequested)
{
if (pubConn is { ConnectionState: NatsConnectionState.Open, TestSocket.WaitForClosed.IsCanceled: false })
{
await pubConn.TestSocket.AbortConnectionAsync(timeoutCts.Token);
Interlocked.Increment(ref reconnects);
}

// give it some time to send more
await Task.Delay(100, timeoutCts.Token);
}
});

await Task.WhenAll(subTask, sendTask, restartTask);
Assert.True(reconnects > 0, "connection did not reconnect");
Assert.True(received <= sent, $"duplicate messages sent on wire- {sent} sent, {received} received");

// some messages may still be lost, as socket could have been disconnected
// after socket.WriteAsync returned, but before OS sent
// check to ensure that the loss was < 1%
var loss = 100.0 - (100.0 * received / sent);
Assert.True(loss <= 1.0, $"message loss of {loss:F}% was above 1% - {sent} sent, {received} received");
}
}
Loading