Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OnSocketAvailableAsync Hook #647

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public interface INatsConnection : INatsClient
/// </summary>
NatsHeaderParser HeaderParser { get; }

/// <summary>
/// Hook before TCP connection open.
/// </summary>
Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync { get; set; }

/// <summary>
/// Hook when socket is available.
/// </summary>
Func<ISocketConnection, ValueTask<ISocketConnection>>? OnSocketAvailableAsync { get; set; }

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace NATS.Client.Core.Internal;
namespace NATS.Client.Core;

internal interface ISocketConnection : IAsyncDisposable
public interface ISocketConnection : IAsyncDisposable
{
public Task<Exception> WaitForClosed { get; }

Expand Down
22 changes: 17 additions & 5 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ internal enum NatsEvent
public partial class NatsConnection : INatsConnection
{
#pragma warning disable SA1401
/// <summary>
/// Hook before TCP connection open.
/// </summary>
public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync;

internal readonly ConnectionStatsCounter Counter; // allow to call from external sources
internal volatile ServerInfo? WritableServerInfo;

Expand Down Expand Up @@ -135,6 +130,11 @@ private set

public NatsHeaderParser HeaderParser { get; }

// Hooks
public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync { get; set; }

public Func<ISocketConnection, ValueTask<ISocketConnection>>? OnSocketAvailableAsync { get; set; }

internal bool IsDisposed
{
get => Interlocked.CompareExchange(ref _isDisposed, 0, 0) == 1;
Expand Down Expand Up @@ -339,6 +339,12 @@ private async ValueTask InitialConnectAsync()
}
}

if (OnSocketAvailableAsync != null)
{
_logger.LogInformation(NatsLogEvents.Connection, "Try to invoke OnSocketAvailable");
_socket = await OnSocketAvailableAsync(_socket).ConfigureAwait(false);
}

_currentConnectUri = uri;
break;
}
Expand Down Expand Up @@ -629,6 +635,12 @@ private async void ReconnectLoop()
}
}

if (OnSocketAvailableAsync != null)
{
_logger.LogInformation(NatsLogEvents.Connection, "Try to invoke OnSocketAvailable");
_socket = await OnSocketAvailableAsync(_socket).ConfigureAwait(false);
}

_currentConnectUri = url;
}
else
Expand Down
54 changes: 54 additions & 0 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,60 @@ public void NewInboxVeryLongPrefixReturnsPrefixWithNuid()

Assert.Matches("A{512}\\.[A-z0-9]{22}", inbox);
}

[Fact]
public async Task OnSocketAvailableAsync_ShouldBeInvokedOnInitialConnection()
{
// Arrange
await using var server = NatsServer.Start();
var clientOpts = server.ClientOpts(NatsOpts.Default);

var wasInvoked = false;
var nats = new NatsConnection(clientOpts);
nats.OnSocketAvailableAsync = async socket =>
{
wasInvoked = true;
await Task.Delay(10);
return socket;
};

// Act
await nats.ConnectAsync();

// Assert
Assert.True(wasInvoked, "OnSocketAvailableAsync should be invoked on initial connection.");
}

[Fact]
public async Task OnSocketAvailableAsync_ShouldBeInvokedOnReconnection()
{
// Arrange
await using var server = NatsServer.Start();
var clientOpts = server.ClientOpts(NatsOpts.Default);

var invocationCount = 0;
var nats = new NatsConnection(clientOpts);
nats.OnSocketAvailableAsync = async socket =>
{
invocationCount++;
await Task.Delay(10);
return socket;
};

// Simulate initial connection
await nats.ConnectAsync();

// Simulate disconnection
await server.StopAsync();

// Act
// Simulate reconnection
server.StartServerProcess();
await nats.ConnectAsync();

// Assert
Assert.Equal(2, invocationCount);
}
}

[JsonSerializable(typeof(SampleClass))]
Expand Down
4 changes: 4 additions & 0 deletions tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public class MockConnection : INatsConnection

public NatsHeaderParser HeaderParser { get; } = new NatsHeaderParser(Encoding.UTF8);

public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync { get; set; }

public Func<ISocketConnection, ValueTask<ISocketConnection>>? OnSocketAvailableAsync { get; set; }

public ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException();
Expand Down